activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wxchc <wx...@126.com>
Subject Question about ActiveInputStream and ActiveMQOutputStream
Date Fri, 21 Jul 2006 01:24:10 GMT

Hi,everyone

    I just use ActiveMQ stream to transfer large file. When I start this
action in one or two thread,It works OK.But if I start more than 3 thread to
transfer data currently,after some data transfered,then the senders and
receiver threads all get waiting state. Please give some help or advices
about this. Thanks a lot.

   the following is my sender and receiver code. 
sender:
/**
 * ActivemqFileSender.java
 * Copyright(C) 2006 Agree Tech, All rights reserved.
 * Created on 2006-7-17 ??11:17:16 by wang
 */
package cn.com.agree.eai.file;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.HashMap;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQOutputStream;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import cn.com.agree.tools.StringTool;

public class ActivemqFileSender implements Runnable {
	/** ?????? */
	private final static Log logger = LogFactory
			.getLog(ActivemqFileSender.class);

	public static String DEFAULT_MQ_URL = "tcp://127.0.0.1:61636";

	public static String SUBJECT_NAME = "FILE.TRANS";
	
	/** ???????????????? */
	public long averageSize;

	/** ??ID????id???????????? */
	public int id;
	/** ????????????? */
	public long blockSize;
	
	/**???????????? */
	public int bufferSize;

	/** ????????? */
	public File file;

	// ???activeMQ??????
	/** Active MQ ???? */
	public String url = DEFAULT_MQ_URL;

	/** ??????queue???????topic?? */
	public String name;

	/**
	 * true : topic false : queue
	 */
	public boolean isTopic = false;

	public String subject ;

	private Connection connection;

	private ActiveMQOutputStream outputStream;	

	/**
	 * ???????????id	 
	 * @param file ????
	 * @param id ??Id
	 * @param averageSize ??????????????
	 * @param blockSize ?????????????????averageSize
	 * @param bufferSize ?????
	 * @throws JMSException
	 */
	public ActivemqFileSender(File file, int id,long averageSize, long
blockSize,int bufferSize) {
		// TODO Auto-generated constructor stub
		this.file = file;
		this.id = id;
		this.averageSize=averageSize;
		this.blockSize = blockSize;
		this.bufferSize=bufferSize;
		
		this.subject= SUBJECT_NAME + id;
		try {
			init();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			if (logger.isDebugEnabled()) {
				logger.debug("?????? @ ???ActiveMQ????", e);
			}
			e.printStackTrace();
		}
	}
	
	/**
	 * ???ActiveMQ????
	 * 
	 * @throws JMSException
	 */
	public void init() throws JMSException {
		// create connection
		ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, url);
		connection = connectionFactory.createConnection();
//		connection.setClientID("FILE");
		connection.start();

		byte defaultType = isTopic ? ActiveMQDestination.TOPIC_TYPE
				: ActiveMQDestination.QUEUE_TYPE;
		ActiveMQDestination destination = ActiveMQDestination
				.createDestination(subject, defaultType);
		
		Map streamProperties = new HashMap();
        streamProperties.put("fileName", "test_file_trans");
        
		this.outputStream = (ActiveMQOutputStream) ((ActiveMQConnection)
connection)
				.createOutputStream(destination, null,
						DeliveryMode.NON_PERSISTENT,
						ActiveMQMessage.DEFAULT_PRIORITY,
						ActiveMQMessage.DEFAULT_TIME_TO_LIVE);

		if (logger.isDebugEnabled()) {
			logger.debug("?????? @ ???ActiveMQ?????subject=" + subject
					+ "defaultType=" + defaultType);
		}

	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see java.lang.Runnable#run()
	 */
	public void run() {
		if(logger.isDebugEnabled())
		{
			logger.debug("??id= "+this.id+" ??????????");
		}
		//???????????64K
		if(this.bufferSize<=0 )
		{
			this.bufferSize=64*1024;
		}
		if(this.bufferSize>this.averageSize)
		{
			this.bufferSize=(int)this.averageSize;
		}
		//?????
		RandomAccessFile raf=null;
		try {
			raf=new RandomAccessFile(file,"r");
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			if(logger.isErrorEnabled()){
				logger.error("?????? @ ???????????"+this.file.getName());
			}
			e.printStackTrace();
			return;
		}
		
		byte[] buffer=new byte[bufferSize];
		int len;
		int totalLen=0;
		long pos=id*averageSize;
		try {
			raf.seek(pos);
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		try {
			while((len=raf.read(buffer))!=-1 && totalLen<blockSize)
			{
				if(totalLen+len<=blockSize){
					outputStream.write(buffer,0,len);
				}else{
					outputStream.write(buffer,0,(int)(blockSize-totalLen));
				}
//				outputStream.flush();
				byte[] b= new byte[len];
				System.arraycopy(buffer, 0, b, 0, len);
//				System.out.println(StringTool.toHexTable(b));
				totalLen+=len;
				System.out.println("Thread id="+id+",sent bytes "+len+"
totalLen="+totalLen+" blockSize="+blockSize);
				System.out.println("file pointer position="+raf.getFilePointer());
				try {
					Thread.sleep(10);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			if(logger.isErrorEnabled()){
				logger.error("?????? @ ????????");
			}
			e.printStackTrace();
		}
		
		try {
			raf.close();
			outputStream.close();
			try {
				connection.close();
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				logger.error("?????? @???id?"+this.id+" ????????");
				e.printStackTrace();
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			logger.error("?????? @??????"+this.id+" ????????");
			e.printStackTrace();
		}
		

	}
	
	public static void main(String[] args){
		File file=new File("E:\\??????2004.RAR");
		long fileSize = file.length();
		
		int count=2;
		long averageSize=fileSize/count;
		for(int i=0;i<count;i++)
		{
			ActivemqFileSender afs=null;
			if(i<count-1){
				afs=new ActivemqFileSender(file,i,averageSize,averageSize,64*1024);
			}else{
				afs=new
ActivemqFileSender(file,i,averageSize,fileSize-(count-1)*averageSize,64*1024);
			}
			
			new Thread(afs).start();
//			try {
////				Thread.sleep(1000);
//			} catch (InterruptedException e) {
//				// TODO Auto-generated catch block
//				e.printStackTrace();
//			}
		}			
	}

}

receiver code:

/**
 * ActivemqFileReceiver.java
 * Copyright(C) 2006 Agree Tech, All rights reserved.
 * Created on 2006-7-18 ??10:18:13 by wang
 */
package cn.com.agree.eai.file;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQInputStream;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ActivemqFileReceiver implements Runnable {

	private static final Log logger = LogFactory
			.getLog(ActivemqFileReceiver.class);

	/** ???????????????? */
	public long averageSize;

	/** ??ID????id???????????? */
	public int id;

	/** ????????????? */
	public long blockSize;

	/** ???????????? */
	public int bufferSize;

	/** ????????? */
	public File file;

	// ???activeMQ??????
	/** Active MQ ???? */
	public String url = ActivemqFileSender.DEFAULT_MQ_URL;

	/** ??????queue???????topic?? */
	public String name;

	/**
	 * true : topic false : queue
	 */
	public boolean isTopic = false;

	public String subject;

	private Connection connection;

	private ActiveMQInputStream inputStream;

	/**
	 * ???????????ID
	 * 
	 * @param file
	 *            ????
	 * @param id
	 *            ??Id
	 * @param averageSize
	 *            ??????????????
	 * @param blockSize
	 *            ?????????????????averageSize
	 * @param bufferSize
	 *            ?????
	 * @throws JMSException
	 */
	public ActivemqFileReceiver(File file, int id, long averageSize,
			long blockSize, int bufferSize) {
		// TODO Auto-generated constructor stub
		this.file = file;
		this.id = id;
		this.averageSize = averageSize;
		this.blockSize = blockSize;
		this.bufferSize = bufferSize;

		this.subject = ActivemqFileSender.SUBJECT_NAME + id;
		try {
			init();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public void init() throws JMSException {
		ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, url);

		connection = connectionFactory.createConnection();
//		connection.setClientID("FILE");
		connection.start();
		
//		Session session=connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
		

		byte defaultType = isTopic ? ActiveMQDestination.TOPIC_TYPE
				: ActiveMQDestination.QUEUE_TYPE;
		ActiveMQDestination destination = ActiveMQDestination
				.createDestination(subject, defaultType);
		
//		session.createConsumer(destination);

		inputStream = (ActiveMQInputStream) ((ActiveMQConnection) connection)
				.createInputStream(destination, null);

		if (logger.isDebugEnabled()) {
			logger.debug("?????? @ ???ActiveMQ?????subject=" + subject
					+ "defaultType=" + defaultType);
		}

	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see java.lang.Runnable#run()
	 */
	public void run() {
		if (logger.isDebugEnabled()) {
			logger.debug("??id= " + this.id + " ??????????");
		}
		// ???????????16K
		if (this.bufferSize <= 0) {
			this.bufferSize = 64 * 1024;
		}
		if (this.bufferSize > this.averageSize) {
			this.bufferSize = (int) this.averageSize;
		}
		// ?????
		RandomAccessFile raf = null;
		try {
			raf = new RandomAccessFile(file, "rwd");
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			if (logger.isErrorEnabled()) {
				logger.error("?????? @ ???????????" + this.file.getName());
			}
			e.printStackTrace();
			try {
				raf.close();
			} catch (IOException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
			try {
				connection.close();
			} catch (JMSException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
			try {
				inputStream.close();
			} catch (IOException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
			return;
		}
		byte[] buffer = new byte[bufferSize];
		int len, totalLen = 0;
		long pos = id * averageSize;
		try {
			raf.seek(pos);
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		try {
			while ((len = inputStream.read(buffer, 0, bufferSize)) != -1
					&& totalLen < blockSize) {
				byte[] b = new byte[len];
				System.arraycopy(buffer, 0, b, 0, len);
				// System.out.println(new String(b));
				raf.write(b);
				totalLen += len;

//				System.out.println(StringTool.toHexTable(b));
				System.out.println("Thread id="+id+",received bytes "+len+"
totalLen="+totalLen+" blockSize="+blockSize);
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			if (logger.isErrorEnabled()) {
				logger.error("?????? @ ????????");
			}
			e.printStackTrace();
		}
		try {
			raf.close();
			inputStream.close();
			try {
				connection.close();
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				logger.error("?????? @???id?" + this.id + " ????????");
				e.printStackTrace();
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			logger.error("?????? @??????" + this.id + " ????????");
			e.printStackTrace();
		}

	}

	public static void main(String[] args) throws IOException {
		File file = new File("d:\\test.rar");
		File f = new File(
				"E:\\??????2004.RAR");
		RandomAccessFile raf = new RandomAccessFile(file, "rwd");
		raf.setLength(f.length());
		int count=2;
		long fileSize = f.length();
		long averageSize = fileSize / count;

		int buffSize = 64 * 1024;
		for (int i = 0; i < count; i++) {
			ActivemqFileReceiver afr = null;
			if (i < count-1) {
				afr = new ActivemqFileReceiver(file, i, averageSize,
						averageSize, buffSize);
			} else {
				afr = new ActivemqFileReceiver(file, i, averageSize, fileSize
						- (count-1) * averageSize, buffSize);
			}

			new Thread(afr).start();
			// try {
			// // Thread.sleep(1000);
			// } catch (InterruptedException e) {
			// // TODO Auto-generated catch block
			// e.printStackTrace();
			// }
		}
		raf.close();
	}

}



-- 
View this message in context: http://www.nabble.com/Question-about-ActiveInputStream-and-ActiveMQOutputStream-tf1978079.html#a5427288
Sent from the ActiveMQ - User forum at Nabble.com.


Mime
View raw message