activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gabe Westmaas" <gabe.westm...@mailtrust.com>
Subject RE: Producer hanged when sending 6M ByteMessage
Date Thu, 21 Aug 2008 04:00:43 GMT
How are you persisting the message?  MySQL has a default max packet size of 1M, and I'm sure
other databases have similar limits.  

Also, if you aren't consuming the messages you may exceed memory limits on the broker, check
the Home page on the web monitor and see if its at 99%.  You can adjust how much memory is
available in the activemq.xml config file (and other places, I'm sure).

Gabe

-----Original Message-----
From: "yanhongsan" <xinchunli1982@163.com>
Sent: Wednesday, August 20, 2008 11:43pm
To: users@activemq.apache.org
Subject: Producer hanged when sending 6M ByteMessage


Hello, everyone. I download the latest version ActiveMQ5.1.  When I send
about 6M  ByteMessage to a queue 100 times loop, I found the producer hanged
after sending a message. The following is my code: 

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.mymq.test;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Date;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * A simple tool for publishing messages
 * 
 * @version $Revision: 1.2 $
 */
public class ProducerTool {

    private Destination destination;
    private int messageCount = 100;
    private long sleepTime;
    private boolean verbose = true;
    private int messageSize = 255;
    private long timeToLive = 0;
    private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private String url = "tcp://129.1.5.95:61616";
    private String subject = "eastsoft";
    private boolean topic = false;
    private boolean transacted=false ;
    private boolean persistent=true;

    public static void main(String[] args) {
        ProducerTool producerTool = new ProducerTool();
     /*   String[] unknown = CommandLineSupport.setOptions(producerTool,
args);
        if (unknown.length > 0) {
            System.out.println("Unknown options: " +
Arrays.toString(unknown));
            System.exit(-1);
        }*/
        producerTool.run();
    }

    public void run() {
        Connection connection = null;
        Session session = null;
       try{
            // Create the connection.
            ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(user, password, url);
            connection = connectionFactory.createConnection();
            connection.start();

            // Create the session
            session = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
            if (topic) {
                destination = session.createTopic(subject);
            } else {
                destination = session.createQueue(subject);
            }
            
            // Create the producer.
            MessageProducer producer = session.createProducer(destination);
            if (persistent) {
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            } else {
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            }
            if (timeToLive != 0) {
                producer.setTimeToLive(timeToLive);
            }

            // Start sending messages
            sendLoop(session, producer);

            System.out.println("Done.");

            // Use the ActiveMQConnection interface to dump the connection
            // stats.
           // ActiveMQConnection c = (ActiveMQConnection)connection;
            //c.getConnectionStats().dump(new IndentPrinter());

        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        } finally {
            try {
            	session.close();
                connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

    protected void sendLoop(Session session, MessageProducer producer)
throws Exception {

        for (int i = 0; i < messageCount || messageCount == 0; i++) {
        	
        	//send text message
          /*  TextMessage message =
session.createTextMessage(createMessageText(i));

            if (verbose) {
                String msg = message.getText();
                if (msg.length() > 50) {
                    msg = msg.substring(0, 50) + "...";
                }
                System.out.println("Sending message: " + msg);
            }*/
        	
        	//send byte message
        	byte[] content = getFileByte("E:\\bb.zip");
        	BytesMessage message = session.createBytesMessage();
        	String id = "msg " + i;
        	message.setJMSMessageID(id);
        	message.writeBytes(content);
        	System.out.println("Sending message: " +
message.getJMSMessageID());
            producer.send(message);
            if (transacted) {
                session.commit();
            }

            Thread.sleep(sleepTime);

        }

    }

    private String createMessageText(int index) {
        StringBuffer buffer = new StringBuffer(messageSize);
        buffer.append("Message: " + index + " sent at: " + new Date());
        if (buffer.length() > messageSize) {
            return buffer.substring(0, messageSize);
        }
        for (int i = buffer.length(); i < messageSize; i++) {
            buffer.append(' ');
        }
        return buffer.toString();
    }

    private	byte[] getFileByte(String filename){
        byte[] buffer = null;
        FileInputStream fin = null;
        try {
            File file = new File(filename);
            fin = new FileInputStream(file); 
            buffer = new byte[fin.available()];
            fin.read(buffer);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                fin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return buffer;
    }
    
    public void setPersistent(boolean durable) {
        this.persistent = durable;
    }

    public void setMessageCount(int messageCount) {
        this.messageCount = messageCount;
    }

    public void setMessageSize(int messageSize) {
        this.messageSize = messageSize;
    }

    public void setPassword(String pwd) {
        this.password = pwd;
    }

    public void setSleepTime(long sleepTime) {
        this.sleepTime = sleepTime;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }

    public void setTimeToLive(long timeToLive) {
        this.timeToLive = timeToLive;
    }

    public void setTopic(boolean topic) {
        this.topic = topic;
    }

    public void setQueue(boolean queue) {
        this.topic = !queue;
    }

    public void setTransacted(boolean transacted) {
        this.transacted = transacted;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public void setVerbose(boolean verbose) {
        this.verbose = verbose;
    }
}

the bb.zip file is about 6M. When I run the class, the result is :
Sending message: msg 0
Sending message: msg 1
But once I consume the queue "eastsoft", I can send all the 100 files.

-- 
View this message in context: http://www.nabble.com/Producer-hanged-when-sending-6M-ByteMessage-tp19082027p19082027.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.




Mime
View raw message