Hello,

i am in the following situation. Several activemq client will send big file. I made a simple unittest using ActiveMQInput/OutputStream, 10 threads sending a 100M file and one thread reading one file at a time.

if i store the 10 files sequentially, no troubles
if i send files in parallel and  use the selector to distinguish between files, ActiveMQInputStream lock after receiving 1.8M. I understand this is because of queue page size and the selector applies only to message in first page. So packet from 9 other files prevent inputstream from receiving the rest of first file.

So i created 1 queue per file instead. However, it exhibits same behaviour. Looks like all my queues are sharing the same kahaDB page?

So what is the proper way to send big files over activemq without having files blocking each other? Is it possible, when i create my new queue for the file, that i specify it has it's own page? We can not afford to read all files in parallel!

If usefull, see below the code used for the test. Thanks in advance for any information you could have.


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.commons.io.IOUtils;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.Test;

public class activeMQMassiveSendFilesTest {
    TestingConfigurator configurator;

    @AfterTest
    public void clean() {
        configurator.cleanup();
    }

    @Test
    public void massiveTest() throws Exception {
        configurator = new TestingConfigurator();
        configurator.init();
        String[] names = new String[] { "bla", "blabla", "blablabla", "444",
                "uuid123", "truc", "bidule", "machin", "alpha", "beta" };
        Thread[] t = new Thread[10];
        class enqueuer extends Thread {
            private String id;

            public enqueuer(String id) {
                this.id = id;
            }

            @Override
            public void run() {
                // TODO Auto-generated method stub
                try {
                    enqueueBigMessageContent(id, new MassiveInputStream(
                            1024 * 1024 * 100));// 100m file
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
        for (int i = 0; i < 10; i++)
            t[i] = new enqueuer(names[i]);

        for (int i = 0; i < 10; i++)
            t[i].start();

        for (int i = 0; i < 10; i++)
            t[i].join();
        for (String name : names) {
            Assert.assertTrue(dequeueBigMessageContent(name));
        }
    }

    private boolean dequeueBigMessageContent(String name) throws JMSException {
        ActiveMQConnection connection = (ActiveMQConnection) configurator.connectionFactory
                .createConnection();
        connection.start();
        QueueSession queueSession = connection.createQueueSession(false,
                Session.AUTO_ACKNOWLEDGE);
        String queueName = configurator.testQueue+"."+name;
        System.out.println("reading from "+queueName);
        Queue destination = queueSession.createQueue(queueName);
        InputStream is = connection.createInputStream(destination);
        int read=0;
        int count = 0;
        boolean ok=true;
        try {
            while ((read=is.read())>=0){
                ok=ok & ( ((count++)&0xFF)==read);   
                if (count%(1024*1024)==0)
                    System.out.println((count/1024/1024)+"M read");
            }
            is.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("read "+(((float)count)/1024.0/1024.0)+"M elements from stream");
        return ok;
    }

    private void enqueueBigMessageContent(final String uid, InputStream source)
            throws JMSException {
        HashMap<String, Object> map = new HashMap<String, Object>();
        map.put("fileuid", uid);
        // init map message
        ActiveMQConnection connection = (ActiveMQConnection) configurator.connectionFactory
                .createConnection();
        connection.start();
        QueueSession queueSession = connection.createQueueSession(false,
                Session.AUTO_ACKNOWLEDGE);

        String queueName = configurator.testQueue+"."+uid;
        System.out.println("writing to "+queueName);
        Queue destination = queueSession.createQueue(queueName);
        OutputStream os = connection.createOutputStream(destination, map,
                DeliveryMode.PERSISTENT, 4, 0);
        try {
            System.out.println("start enqueuing " + uid);
            IOUtils.copy(source, os);
            os.close();
            System.out.println("done enqueuing " + uid);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        connection.close();
        System.out.println("connection closed for " + uid);
    }

    private static class MassiveInputStream extends InputStream {
        private long size;
        private long current;

        public MassiveInputStream(long size) {
            this.size = size;
        }

        @Override
        public int read() throws IOException {
            if (current > size)
                return -1;
            return (int) ((current++) & 0xFF);
        }
    }
}

-- 
David Delbecq
ICT
Institut Royal Météorologique
Ext:557