activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Delbecq <david.delb...@oma.be>
Subject ActiveMQInputStream recommanded usage?
Date Wed, 26 Jan 2011 15:45:03 GMT
Hello,

i am in the following situation. Several activemq client will send big 
file. I made a simple unittest using ActiveMQInput/OutputStream, 10 
threads sending a 100M file and one thread reading one file at a time.

if i store the 10 files sequentially, no troubles
if i send files in parallel and  use the selector to distinguish between 
files, ActiveMQInputStream lock after receiving 1.8M. I understand this 
is because of queue page size and the selector applies only to message 
in first page. So packet from 9 other files prevent inputstream from 
receiving the rest of first file.

So i created 1 queue per file instead. However, it exhibits same 
behaviour. Looks like all my queues are sharing the same kahaDB page?

So what is the proper way to send big files over activemq without having 
files blocking each other? Is it possible, when i create my new queue 
for the file, that i specify it has it's own page? We can not afford to 
read all files in parallel!

If usefull, see below the code used for the test. Thanks in advance for 
any information you could have.


    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.util.HashMap;

    import javax.jms.DeliveryMode;
    import javax.jms.JMSException;
    import javax.jms.Queue;
    import javax.jms.QueueSession;
    import javax.jms.Session;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.commons.io.IOUtils;
    import org.testng.Assert;
    import org.testng.annotations.AfterTest;
    import org.testng.annotations.Test;

    public class activeMQMassiveSendFilesTest {
         TestingConfigurator configurator;

         @AfterTest
         public void clean() {
             configurator.cleanup();
         }

         @Test
         public void massiveTest() throws Exception {
             configurator = new TestingConfigurator();
             configurator.init();
             String[] names = new String[] { "bla", "blabla",
    "blablabla", "444",
                     "uuid123", "truc", "bidule", "machin", "alpha",
    "beta" };
             Thread[] t = new Thread[10];
             class enqueuer extends Thread {
                 private String id;

                 public enqueuer(String id) {
                     this.id = id;
                 }

                 @Override
                 public void run() {
                     // TODO Auto-generated method stub
                     try {
                         enqueueBigMessageContent(id, new
    MassiveInputStream(
                                 1024 * 1024 * 100));// 100m file
                     } catch (JMSException e) {
                         e.printStackTrace();
                     }
                 }
             }
             for (int i = 0; i < 10; i++)
                 t[i] = new enqueuer(names[i]);

             for (int i = 0; i < 10; i++)
                 t[i].start();

             for (int i = 0; i < 10; i++)
                 t[i].join();
             for (String name : names) {
                 Assert.assertTrue(dequeueBigMessageContent(name));
             }
         }

         private boolean dequeueBigMessageContent(String name) throws
    JMSException {
             ActiveMQConnection connection = (ActiveMQConnection)
    configurator.connectionFactory
                     .createConnection();
             connection.start();
             QueueSession queueSession =
    connection.createQueueSession(false,
                     Session.AUTO_ACKNOWLEDGE);
             String queueName = configurator.testQueue+"."+name;
             System.out.println("reading from "+queueName);
             Queue destination = queueSession.createQueue(queueName);
             InputStream is = connection.createInputStream(destination);
             int read=0;
             int count = 0;
             boolean ok=true;
             try {
                 while ((read=is.read())>=0){
                     ok=ok & ( ((count++)&0xFF)==read);
                     if (count%(1024*1024)==0)
                         System.out.println((count/1024/1024)+"M read");
                 }
                 is.close();
             } catch (IOException e) {
                 e.printStackTrace();
             }
             System.out.println("read
    "+(((float)count)/1024.0/1024.0)+"M elements from stream");
             return ok;
         }

         private void enqueueBigMessageContent(final String uid,
    InputStream source)
                 throws JMSException {
             HashMap<String, Object> map = new HashMap<String, Object>();
             map.put("fileuid", uid);
             // init map message
             ActiveMQConnection connection = (ActiveMQConnection)
    configurator.connectionFactory
                     .createConnection();
             connection.start();
             QueueSession queueSession =
    connection.createQueueSession(false,
                     Session.AUTO_ACKNOWLEDGE);

             String queueName = configurator.testQueue+"."+uid;
             System.out.println("writing to "+queueName);
             Queue destination = queueSession.createQueue(queueName);
             OutputStream os =
    connection.createOutputStream(destination, map,
                     DeliveryMode.PERSISTENT, 4, 0);
             try {
                 System.out.println("start enqueuing " + uid);
                 IOUtils.copy(source, os);
                 os.close();
                 System.out.println("done enqueuing " + uid);
             } catch (IOException e) {
                 // TODO Auto-generated catch block
                 e.printStackTrace();
             }
             connection.close();
             System.out.println("connection closed for " + uid);
         }

         private static class MassiveInputStream extends InputStream {
             private long size;
             private long current;

             public MassiveInputStream(long size) {
                 this.size = size;
             }

             @Override
             public int read() throws IOException {
                 if (current > size)
                     return -1;
                 return (int) ((current++) & 0xFF);
             }
         }
    }


-- 
David Delbecq
ICT
Institut Royal Météorologique
Ext:557


Mime
View raw message