activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ireneshy <ireneson...@gmail.com>
Subject Re: Getting IllegalAgrEception while Sending BlobMessage
Date Wed, 14 Apr 2010 08:33:18 GMT

Hi,
     I'm using the version 'apache-activemq-5.3.1',and the code here. Maybe
something wrong with the configuration?
     The activemq.xml is like this:


    
    
        
            file:${activemq.base}/conf/credentials.properties
              
    

    
    
 
        
              
        
            
              
                " producerFlowControl="false" memoryLimit="1 gb">
                  
                    
                  
                
                " producerFlowControl="false" memoryLimit="1 gb">
                      
                   
					
				   

				  
                    
                  
                  
                
              
            
         
 
        
        
        
            
        

        
        
            
        
        
        
             
        
            
                
                    
                
                
                    
                
                
                    
                
            
        
		
		  
        
        
            
        

    

    

    
    
    



1. The producer
 import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;

import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
   
 public class ProducerTool {   
   
     private String user = ActiveMQConnection.DEFAULT_USER;   
   
     private String password = ActiveMQConnection.DEFAULT_PASSWORD;   
   
     private String url =
"tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";
  
   
     private String subject = "blob.queue";   
   
     private Destination destination = null;   
   
     private ActiveMQConnection connection = null;   
   
     private ActiveMQSession session = null;   
   
     private MessageProducer producer = null;   
   
     // 初始化   
     private void initialize() throws JMSException, Exception {   
         ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(   
                 user, password, url);   
         connection =
(ActiveMQConnection)connectionFactory.createConnection();   
         session = (ActiveMQSession)connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);   
         destination = session.createQueue(subject);   
         producer = session.createProducer(destination);   
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
         connection.setCopyMessageOnSend(false);
     }   
   
     // 发送消息   
     public void produceMessage(String filename) throws JMSException,
Exception {   
         initialize();   
         File file = new File(filename);
         BlobMessage msg = session.createBlobMessage(file);
         
         //设置一些所发送附件的属性
       msg.setStringProperty("fileName", file.getName());
         msg.setLongProperty("fileSize", file.length());
                  
         connection.start();   
         System.out.println("Producer:->Sending message: " +
file.getName()+",文件大小为:" + file.length()+"B");   
         //发送文件
       producer.send(msg);   
         System.out.println("Producer:->Message sent complete!");   
     }
         
     // 关闭连接   
     public void close() throws JMSException {   
         System.out.println("Producer:->Closing connection");   
         if (producer != null)   
             producer.close();   
         if (session != null)   
             session.close();   
         if (connection != null)   
             connection.close();   
     }   
 }    
2. The consumer

package blobMsg;

 import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.blob.BlobDownloadStrategy;
import org.apache.activemq.blob.FTPBlobDownloadStrategy;
import org.apache.activemq.command.ActiveMQBlobMessage;
 
 public class ConsumerTool implements MessageListener {   
   
     private String user = ActiveMQConnection.DEFAULT_USER;   
   
     private String password = ActiveMQConnection.DEFAULT_PASSWORD;   
   
     private String url =
"tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/";
  
   
     private String subject = "blob.queue";   
   
     private Destination destination = null;   
   
     private ActiveMQConnection connection = null;   
   
     private ActiveMQSession session = null;   
   
     private MessageConsumer consumer = null; 
       
     // 初始化   
     private void initialize() throws JMSException, Exception {   
         ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(   
                 user, password, url);   
         connection =
(ActiveMQConnection)connectionFactory.createConnection();   
         session = (ActiveMQSession)connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);   
         destination = session.createQueue(subject);   
         consumer = session.createConsumer(destination);
     }       
   
     // 消费消息   
     public void consumeMessage() throws JMSException, Exception {   
         initialize();   
         connection.start();   
         // 开始监听  
         System.out.println("Consumer:->Begin listening...");   
         // 设置监听器   
         consumer.setMessageListener(this);   
         // Message message = consumer.receive();   
     }   
   
     // 关闭连接   
     public void close() throws JMSException {   
         System.out.println("Consumer:->Closing connection");   
         if (consumer != null)   
             consumer.close();   
         if (session != null)   
             session.close();   
         if (connection != null)   
             connection.close();   
     }   
   
     // 消息处理函数   
     public void onMessage(Message message) {   
         try {   
             if (message instanceof BlobMessage) {   
             BlobMessage blobMsg = (BlobMessage) message;
            	 System.out.println("Consumer:->Received: " + blobMsg);
            	 String fileName = blobMsg.getStringProperty("fileName");
            	 System.out.println("fileName:"+fileName);
            	 long fileSize = blobMsg.getLongProperty("fileSize");
            	 System.out.println("fileSize:"+fileSize+"B");
            	             	
            	 InputStream inStr = blobMsg.getInputStream();
              BufferedInputStream bin = new BufferedInputStream(inStr);
            	 File file = new File("d:/test"+fileName);
            	 BufferedOutputStream bout = new BufferedOutputStream(new
FileOutputStream(file)); 
            	 
            	 byte[] block = new byte[1024];
            	 while (bin.read(block)!=-1) {            		 
            		 bout.write(block); 
            	 }        	 
            	 bout.close();  
            	 bin.close();
            	 inStr.close();
             } else {               	  
                 System.out.println("Consumer:->Received: " + message);   
             }   
         } catch (JMSException e) {   
             // TODO Auto-generated catch block   
             e.printStackTrace();   
         } catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} 
     }   
 }    
3. The test code

 import javax.jms.JMSException;   
   
 public class Test {   
   
        public static void main(String[] args) throws JMSException,
Exception {   
         // TODO Auto-generated method stub   
         ConsumerTool consumer = new ConsumerTool();   
         ProducerTool producer = new ProducerTool();   
         // 开始监听   
       consumer.consumeMessage();   
            
         // 延时500毫秒之后发送消息   
      Thread.sleep(500);   
         producer.produceMessage("d:/1.doc");   
         producer.close();   
            
         // 延时500毫秒之后停止接受消息   
       Thread.sleep(500);   
         consumer.close();   
       }   
 }    



Dejan Bosanac wrote:
> 
> Hi,
> 
> what version are you using and can you post your code?
> 
> Cheers
> --
> Dejan Bosanac - http://twitter.com/dejanb
> 
> Open Source Integration - http://fusesource.com/
> ActiveMQ in Action - http://www.manning.com/snyder/
> Blog - http://www.nighttale.net
> 
> 
> On Wed, Apr 14, 2010 at 5:11 AM, ireneshy  wrote:
> 
>>
>> Hi ,
>> I am trying a simple producer and consumer application for BlobMessage,
>> it
>> works  fine for files of size 8 kb.But if I send file of size more than
>> 64KB
>> , it throws an exception in the activemq console :
>>
>> ERROR log                            - EXCEPTION
>> java.lang.IllegalArgumentException
>>        at java.nio.Buffer.position(Buffer.java:218)
>>        at org.mortbay.io.nio.NIOBuffer.poke(NIOBuffer.java:142)
>>        at org.mortbay.io.AbstractBuffer.put(AbstractBuffer.java:391)
>>        at org.mortbay.jetty.HttpParser.reset(HttpParser.java:844)
>>        at
>> org.mortbay.jetty.HttpConnection.destroy(HttpConnection.java:131)
>>        at
>>
>> org.mortbay.jetty.AbstractConnector.connectionClosed(AbstractConnector.java:785)
>>        at
>>
>> org.mortbay.jetty.nio.SelectChannelConnector.access$100(SelectChannelConnector.java:64)
>>        at
>>
>> org.mortbay.jetty.nio.SelectChannelConnector$1.endPointClosed(SelectChannelConnector.java:92)
>>        at
>>
>> org.mortbay.io.nio.SelectChannelEndPoint.doUpdateKey(SelectChannelEndPoint.java:382)
>>        at
>>
>> org.mortbay.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:337)
>>        at
>> org.mortbay.io.nio.SelectorManager.doSelect(SelectorManager.java:166)
>>        at
>>
>> org.mortbay.jetty.nio.SelectChannelConnector.accept(SelectChannelConnector.java:124)
>>        at
>>
>> org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:537)
>>
>> I've tried the code listed in the url:
>>
>> http://old.nabble.com/Getting-IllegalAgrEception-while-Sending-BlobMessage-td17731668.html#a17731668
>>
>> But got the same error. Who knows how to fix the problem?
>>
>>
>> --
>> View this message in context:
>> http://old.nabble.com/Getting-IllegalAgrEception-while-Sending-BlobMessage-tp28237935p28237935.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
> 
> 
> -----
> Dejan Bosanac
> 
> Open Source Integration - http://fusesource.com/
> ActiveMQ in Action - http://www.manning.com/snyder/
> Blog - http://www.nighttale.net
> 

-- 
View this message in context: http://old.nabble.com/Getting-IllegalAgrEception-while-Sending-BlobMessage-tp28237935p28239844.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message