activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lxgao <lxgao2...@gmail.com>
Subject No-Durable topic consumer OutofMemory solution
Date Fri, 27 Jul 2007 02:07:18 GMT

Hi all:
    Last day I post a bug about No-Durable topic consumer OutofMemory
Problem, see at https://issues.apache.org/activemq/browse/AMQ-1325.
    This day I found the cause is MessageDispatchChannel has no max
capacity, if the procuder send message faster and the consumer receiver
slower , the LinkedList in MessageDispatchChannel class would increase until
OutofMemory. 
   I write two MessageDispatchChannel , one use only bloking lock , another
use two bloking lock(Get lock ang Put lock). The code is below:

1.  use only bloking lock 
public class MessageDispatchChannel {

    //private final Object mutex = new Object();
    private final LinkedList<MessageDispatch> list;
    private AtomicBoolean closed=new AtomicBoolean(false);
    private AtomicBoolean running=new AtomicBoolean(false);
    private final int capacity;
    
    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger(0);
    
    /** Lock held by take, poll, etc */
    private final ReentrantLock lock;

    /** Wait queue for waiting takes */
    private final Condition notEmpty;
     /** Wait queue for waiting puts */
    private final Condition notFull;

    public MessageDispatchChannel() {
        this.list = new LinkedList<MessageDispatch>();
        this.capacity=Integer.MAX_VALUE;
        lock = new ReentrantLock(false);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    public MessageDispatchChannel(int capacity) {
    	if (capacity <= 0) throw new IllegalArgumentException();
        this.list = new LinkedList<MessageDispatch>();
        this.capacity=capacity;
        lock = new ReentrantLock(false);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public void enqueue(MessageDispatch message) throws
InterruptedException{
    	if (closed.get() ||!running.get()){
    		return ;
    	}
//    	synchronized(mutex) {
//        	if (list.size()>=this.capacity){
//        		try {
//        			System.out.println(" overflow.");
//					mutex.wait(1000);
//				} catch (InterruptedException e) {
//				}
//        	}
//        	list.addLast(message);
//            mutex.notify();
//        }
    	if (message == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset
        // local var holding count  negative to indicate failure unless set.
        int c = -1;        
        final ReentrantLock lock = this.lock;
        final AtomicInteger count = this.count;
        lock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from
             * capacity. Similarly for all other uses of count in
             * other wait guards.
             */
            try {
                while (count.get() == capacity)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            list.addLast(message);
            c = count.getAndIncrement();
            notEmpty.signal();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            lock.unlock();
        }    
    	
    }

    public void enqueueFirst(MessageDispatch message) throws
InterruptedException{
    	if (closed.get() ||!running.get()){
    		return ;
    	}
//        synchronized(mutex) {
//        	if (list.size()>=this.capacity){
//        		try {
//					mutex.wait(1000);
//				} catch (InterruptedException e) {
//				}
//        	}
//            list.addFirst(message);
//            mutex.notify();
//        }
        if (message == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset
        // local var holding count  negative to indicate failure unless set.
        int c = -1;
        final ReentrantLock lock = this.lock;
        final AtomicInteger count = this.count;
        lock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from
             * capacity. Similarly for all other uses of count in
             * other wait guards.
             */
            try {
                while (count.get() == capacity)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            list.addFirst(message);
            c = count.getAndIncrement();
            notEmpty.signal();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
        	lock.unlock();
        }
    }

    public boolean isEmpty() {
//        synchronized(mutex) {
//            return list.isEmpty();
//        }
    	return count.get()==0;
    }

    /**
     * Used to get an enqueued message. 
     * The amount of time this method blocks is based on the timeout value. 
     * - if timeout==-1 then it blocks until a message is received. 
     * - if timeout==0 then it it tries to not block at all, it returns a
message if it is available 
     * - if timeout>0 then it blocks up to timeout amount of time.
     * 
     * Expired messages will consumed by this method.  
     * 
     * @throws JMSException 
     * 
     * @return null if we timeout or if the consumer is closed.
     * @throws InterruptedException 
     */
    public MessageDispatch dequeue(long timeout) throws InterruptedException
{
//        synchronized (mutex) {
//            // Wait until the consumer is ready to deliver messages.
//            while(timeout != 0 && !closed && (list.isEmpty() || !running))
{
//                if (timeout == -1) {
//                    mutex.wait();
//                } else {
//                    mutex.wait(timeout);
//                    break;
//                }
//            }
//            if (closed || !running || list.isEmpty()) {
//                return null;
//            }
//            return list.removeFirst();
//        }
    	if (closed.get() ||!running.get()){
    		return null;
    	}
    	MessageDispatch x = null;
        int c = -1;
        long nanos = TimeUnit.MILLISECONDS.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (count.get() > 0) {
                    x = list.poll();
                    c = count.getAndDecrement();
                    notFull.signal();
                    if (c > 1)
                        notEmpty.signal();
                    break;
                }
                if (nanos <= 0)
                    return null;
                try {
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to a non-interrupted
thread
                    throw ie;
                }
            }
        } finally {
        	lock.unlock();
        }
        return x;
    }
    
    public MessageDispatch dequeueNoWait() {
    	if (closed.get() ||!running.get()){
    		return null;
    	}
//        synchronized (mutex) {
//            if (closed || !running || list.isEmpty()) {
//                return null;
//            }
//            return list.removeFirst();
//        }
    	final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        MessageDispatch x = null;
        int c = -1;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count.get() > 0) {
                x =list.poll();
                c = count.getAndDecrement();
                notFull.signal();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
        	lock.unlock();
        }
        return x;
    }
    
    public MessageDispatch peek() {
    	if (closed.get() ||!running.get()){
    		return null;
    	}
//        synchronized (mutex) {
//            if (closed || !running || list.isEmpty()) {
//                return null;
//            }
//            return list.getFirst();
//        }    	
    	final ReentrantLock lock = this.lock;
    	lock.lock();
    	final AtomicInteger count = this.count;
    	if (count.get() == 0)
            return null;
        try {
        	return list.peek();
        } finally {
        	lock.unlock();
        }
    }

    

    public void clear() {
//        synchronized(mutex) {
//            list.clear();
//        }
    	final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        	list.clear();
            if (count.getAndSet(0) == capacity)
                notFull.signalAll();
        } finally {
        	lock.unlock();
        }
    }

    public boolean isClosed() {
        return closed.get();
    }

    public int size() {
//        synchronized(mutex) {
//            return list.size();
//        }
    	return count.get();
    }
    
    public int remainingCapacity() {
        return capacity - count.get();
    }
    
    public List<MessageDispatch> removeAll() {
//        synchronized(mutex) {
//            ArrayList <MessageDispatch>rc = new
ArrayList<MessageDispatch>(list);
//            list.clear();
//            return rc;
//        }
    	final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        	ArrayList <MessageDispatch>rc = new
ArrayList<MessageDispatch>(list);
        	list.clear();
        	if (count.getAndSet(0) == capacity)
                notFull.signalAll();
        	return rc;
        } finally {
        	lock.unlock();
        }
    }

    public String toString() {
//        synchronized(mutex) {
//            return list.toString();
//        }
    	final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return list.toString();
        } finally {
        	lock.unlock();
        }
    }
    
    public void start() {
//        synchronized (mutex) {
//            running = true;
//            mutex.notifyAll();
//        }
    	final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        	count.set(0);
        	running.compareAndSet(false, true);
        } finally {
        	lock.unlock();
        }
    }

    public void stop() {
//        synchronized (mutex) {
//            running = false;
//            mutex.notifyAll();
//        }
    	final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        	count.set(0);
        	running.compareAndSet(true, false);
        } finally {
        	lock.unlock();
        }
    }

    public void close() {
//        synchronized (mutex) {
//            if (!closed) {
//                running = false;
//                closed = true;
//            }
//            mutex.notifyAll();
//        }
    	final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        	count.set(0);
        	running.compareAndSet(true, false);
        	closed.compareAndSet(false, true);
        } finally {
        	lock.unlock();
        }
    }

    //public Object getMutex() {
    //    return mutex;
    //}

    public boolean isRunning() {
        return running.get();
    }  
    
    public ReentrantLock getLock(){
    	return this.lock;
    }
    
}

2.use two bloking lock(Get lock ang Put lock)
public class MesssageDispatchChannel<E> extends AbstractQueue<E>
		implements BlockingQueue<E>, java.io.Serializable {
	/*
	 * A variant of the "two lock queue" algorithm. The putLock gates entry to
	 * put (and offer), and has an associated condition for waiting puts.
	 * Similarly for the takeLock. The "count" field that they both rely on is
	 * maintained as an atomic to avoid needing to get both locks in most
cases.
	 * Also, to minimize need for puts to get takeLock and vice-versa,
cascading
	 * notifies are used. When a put notices that it has enabled at least one
	 * take, it signals taker. That taker in turn signals others if more items
	 * have been entered since the signal. And symmetrically for takes
	 * signalling puts. Operations such as remove(Object) and iterators acquire
	 * both locks.
	 */

	/**
	 * 
	 */
	private static final long serialVersionUID = 1330823692422658197L;

	/**
	 * Linked list node class
	 */
	static class Node<E> {
		/** The item, volatile to ensure barrier separating write and read */
		volatile E item;
		Node<E> next;

		Node(E x) {
			item = x;
		}
	}

	/** The capacity bound, or Integer.MAX_VALUE if none */
	private final int capacity;

	/** Current number of elements */
	private final AtomicInteger count = new AtomicInteger(0);

	/** Head of linked list */
	private transient Node<E> head;

	/** Tail of linked list */
	private transient Node<E> last;

	/** Lock held by take, poll, etc */
	private final ReentrantLock takeLock = new ReentrantLock();

	/** Wait queue for waiting takes */
	private final Condition notEmpty = takeLock.newCondition();

	/** Lock held by put, offer, etc */
	private final ReentrantLock putLock = new ReentrantLock();

	/** Wait queue for waiting puts */
	private final Condition notFull = putLock.newCondition();

	private AtomicBoolean closed = new AtomicBoolean(false);
	private AtomicBoolean running = new AtomicBoolean(false);

	public boolean enqueue(E o) throws InterruptedException {
		if (closed.get() || !running.get()) {
			return false;
		}
		return this.offer(o);
	}

	public boolean enqueueFirst(E o) throws InterruptedException {
		if (closed.get() || !running.get()) {
			return false;
		}

		if (o == null)
			throw new NullPointerException();
		final AtomicInteger count = this.count;
		if (count.get() == capacity)
			return false;
		int c = -1;
		final ReentrantLock putLock = this.putLock;
		putLock.lock();
		try {
			if (count.get() < capacity) {
				addFirst(o);
				c = count.getAndIncrement();
				if (c + 1 < capacity)
					notFull.signal();
			}
		} finally {
			putLock.unlock();
		}
		if (c == 0)
			signalNotEmpty();
		return c >= 0;
	}

	public boolean isEmpty() {
		final AtomicInteger count = this.count;
		return count.get() == 0;
	}

	public E dequeue(long timeout) throws InterruptedException {
		if (closed.get() || !running.get()) {
			return null;
		}
		return this.poll(timeout, TimeUnit.MILLISECONDS);
	}

	public E dequeueNoWait() {
		if (closed.get() || !running.get()) {
			return null;
		}
		return this.poll();
	}	

	public List<E> removeAll() {
		fullyLock();
		try {
			int size = count.get();
			ArrayList<E> rc = new ArrayList<E>(size);
			for (Node<E> p = head.next; p != null; p = p.next) {
				rc.add(p.item);
			}
			head.next = null;
			assert head.item == null;
			last = head;
			if (count.getAndSet(0) == capacity)
				notFull.signalAll();
			return rc;
		} finally {
			fullyUnlock();
		}
	}

	public void start() {
		fullyLock();
		try {
			count.set(0);
			running.compareAndSet(false, true);
		} finally {
			fullyUnlock();
		}
	}

	public void stop() {
		fullyLock();
		try {
			count.set(0);
			running.compareAndSet(true, false);
		} finally {
			fullyUnlock();
		}
	}

	public void close() {
		fullyLock();
		try {
			count.set(0);
			running.compareAndSet(true, false);
			closed.compareAndSet(false, true);
		} finally {
			fullyUnlock();
		}
	}
	
	public boolean isClosed() {
		return closed.get();
	}

	public boolean isRunning() {
		return running.get();
	}

	/**
	 * Signal a waiting take. Called only from put/offer (which do not
otherwise
	 * ordinarily lock takeLock.)
	 */
	private void signalNotEmpty() {
		final ReentrantLock takeLock = this.takeLock;
		takeLock.lock();
		try {
			notEmpty.signal();
		} finally {
			takeLock.unlock();
		}
	}

	/**
	 * Signal a waiting put. Called only from take/poll.
	 */
	private void signalNotFull() {
		final ReentrantLock putLock = this.putLock;
		putLock.lock();
		try {
			notFull.signal();
		} finally {
			putLock.unlock();
		}
	}

	/**
	 * Create a node and link it at end of queue
	 * 
	 * @param x
	 *            the item
	 */
	private void addFirst(E x) {
		Node<E> node = new Node<E>(x);
		node.next = head;
		head = node;
	}

	/**
	 * Create a node and link it at end of queue
	 * 
	 * @param x
	 *            the item
	 */
	private void insert(E x) {
		last = last.next = new Node<E>(x);
	}

	/**
	 * Remove a node from head of queue,
	 * 
	 * @return the node
	 */
	private E extract() {
		Node<E> first = head.next;
		head = first;
		E x = first.item;
		first.item = null;
		return x;
	}

	/**
	 * Lock to prevent both puts and takes.
	 */
	protected void fullyLock() {
		putLock.lock();
		takeLock.lock();
	}

	/**
	 * Unlock to allow both puts and takes.
	 */
	protected void fullyUnlock() {
		takeLock.unlock();
		putLock.unlock();
	}

	/**
	 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
	 * {@link Integer#MAX_VALUE}.
	 */
	public MesssageDispatchChannel() {
		this(Integer.MAX_VALUE);
	}

	/**
	 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
	 * 
	 * @param capacity
	 *            the capacity of this queue.
	 * @throws IllegalArgumentException
	 *             if <tt>capacity</tt> is not greater than zero.
	 */
	public MesssageDispatchChannel(int capacity) {
		if (capacity <= 0)
			throw new IllegalArgumentException();
		this.capacity = capacity;
		last = head = new Node<E>(null);
	}

	/**
	 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
	 * {@link Integer#MAX_VALUE}, initially containing the elements of the
	 * given collection, added in traversal order of the collection's iterator.
	 * 
	 * @param c
	 *            the collection of elements to initially contain
	 * @throws NullPointerException
	 *             if <tt>c</tt> or any element within it is <tt>null</tt>
	 */
	public MesssageDispatchChannel(Collection<? extends E> c) {
		this(Integer.MAX_VALUE);
		for (E e : c)
			add(e);
	}

	// this doc comment is overridden to remove the reference to collections
	// greater in size than Integer.MAX_VALUE
	/**
	 * Returns the number of elements in this queue.
	 * 
	 * @return the number of elements in this queue.
	 */
	public int size() {
		return count.get();
	}

	// this doc comment is a modified copy of the inherited doc comment,
	// without the reference to unlimited queues.
	/**
	 * Returns the number of elements that this queue can ideally (in the
	 * absence of memory or resource constraints) accept without blocking. This
	 * is always equal to the initial capacity of this queue less the current
	 * <tt>size</tt> of this queue.
	 * <p>
	 * Note that you <em>cannot</em> always tell if an attempt to <tt>add</tt>
	 * an element will succeed by inspecting <tt>remainingCapacity</tt>
	 * because it may be the case that a waiting consumer is ready to
	 * <tt>take</tt> an element out of an otherwise full queue.
	 */
	public int remainingCapacity() {
		return capacity - count.get();
	}

	/**
	 * Adds the specified element to the tail of this queue, waiting if
	 * necessary for space to become available.
	 * 
	 * @param o
	 *            the element to add
	 * @throws InterruptedException
	 *             if interrupted while waiting.
	 * @throws NullPointerException
	 *             if the specified element is <tt>null</tt>.
	 */
	public void put(E o) throws InterruptedException {
		if (o == null)
			throw new NullPointerException();
		// Note: convention in all put/take/etc is to preset
		// local var holding count negative to indicate failure unless set.
		int c = -1;
		final ReentrantLock putLock = this.putLock;
		final AtomicInteger count = this.count;
		putLock.lockInterruptibly();
		try {
			/*
			 * Note that count is used in wait guard even though it is not
			 * protected by lock. This works because count can only decrease at
			 * this point (all other puts are shut out by lock), and we (or some
			 * other waiting put) are signalled if it ever changes from
			 * capacity. Similarly for all other uses of count in other wait
			 * guards.
			 */
			try {
				while (count.get() == capacity)
					notFull.await();
			} catch (InterruptedException ie) {
				notFull.signal(); // propagate to a non-interrupted thread
				throw ie;
			}
			insert(o);
			c = count.getAndIncrement();
			if (c + 1 < capacity)
				notFull.signal();
		} finally {
			putLock.unlock();
		}
		if (c == 0)
			signalNotEmpty();
	}

	/**
	 * Inserts the specified element at the tail of this queue, waiting if
	 * necessary up to the specified wait time for space to become available.
	 * 
	 * @param o
	 *            the element to add
	 * @param timeout
	 *            how long to wait before giving up, in units of <tt>unit</tt>
	 * @param unit
	 *            a <tt>TimeUnit</tt> determining how to interpret the
	 *            <tt>timeout</tt> parameter
	 * @return <tt>true</tt> if successful, or <tt>false</tt> if the
	 *         specified waiting time elapses before space is available.
	 * @throws InterruptedException
	 *             if interrupted while waiting.
	 * @throws NullPointerException
	 *             if the specified element is <tt>null</tt>.
	 */
	public boolean offer(E o, long timeout, TimeUnit unit)
			throws InterruptedException {

		if (o == null)
			throw new NullPointerException();
		long nanos = unit.toNanos(timeout);
		int c = -1;
		final ReentrantLock putLock = this.putLock;
		final AtomicInteger count = this.count;
		putLock.lockInterruptibly();
		try {
			for (;;) {
				if (count.get() < capacity) {
					insert(o);
					c = count.getAndIncrement();
					if (c + 1 < capacity)
						notFull.signal();
					break;
				}
				if (nanos <= 0)
					return false;
				try {
					nanos = notFull.awaitNanos(nanos);
				} catch (InterruptedException ie) {
					notFull.signal(); // propagate to a non-interrupted thread
					throw ie;
				}
			}
		} finally {
			putLock.unlock();
		}
		if (c == 0)
			signalNotEmpty();
		return true;
	}

	/**
	 * Inserts the specified element at the tail of this queue if possible,
	 * returning immediately if this queue is full.
	 * 
	 * @param o
	 *            the element to add.
	 * @return <tt>true</tt> if it was possible to add the element to this
	 *         queue, else <tt>false</tt>
	 * @throws NullPointerException
	 *             if the specified element is <tt>null</tt>
	 */
	public boolean offer(E o) {
		if (o == null)
			throw new NullPointerException();
		final AtomicInteger count = this.count;
		if (count.get() == capacity)
			return false;
		int c = -1;
		final ReentrantLock putLock = this.putLock;
		putLock.lock();
		try {
			if (count.get() < capacity) {
				insert(o);
				c = count.getAndIncrement();
				if (c + 1 < capacity)
					notFull.signal();
			}
		} finally {
			putLock.unlock();
		}
		if (c == 0)
			signalNotEmpty();
		return c >= 0;
	}

	public E take() throws InterruptedException {
		E x;
		int c = -1;
		final AtomicInteger count = this.count;
		final ReentrantLock takeLock = this.takeLock;
		takeLock.lockInterruptibly();
		try {
			try {
				while (count.get() == 0)
					notEmpty.await();
			} catch (InterruptedException ie) {
				notEmpty.signal(); // propagate to a non-interrupted thread
				throw ie;
			}

			x = extract();
			c = count.getAndDecrement();
			if (c > 1)
				notEmpty.signal();
		} finally {
			takeLock.unlock();
		}
		if (c == capacity)
			signalNotFull();
		return x;
	}

	public E poll(long timeout, TimeUnit unit) throws InterruptedException {
		E x = null;
		int c = -1;
		long nanos = unit.toNanos(timeout);
		final AtomicInteger count = this.count;
		final ReentrantLock takeLock = this.takeLock;
		takeLock.lockInterruptibly();
		try {
			for (;;) {
				if (count.get() > 0) {
					x = extract();
					c = count.getAndDecrement();
					if (c > 1)
						notEmpty.signal();
					break;
				}
				if (nanos <= 0)
					return null;
				try {
					nanos = notEmpty.awaitNanos(nanos);
				} catch (InterruptedException ie) {
					notEmpty.signal(); // propagate to a non-interrupted thread
					throw ie;
				}
			}
		} finally {
			takeLock.unlock();
		}
		if (c == capacity)
			signalNotFull();
		return x;
	}

	public E poll() {
		final AtomicInteger count = this.count;
		if (count.get() == 0)
			return null;
		E x = null;
		int c = -1;
		final ReentrantLock takeLock = this.takeLock;
		takeLock.lock();
		try {
			if (count.get() > 0) {
				x = extract();
				c = count.getAndDecrement();
				if (c > 1)
					notEmpty.signal();
			}
		} finally {
			takeLock.unlock();
		}
		if (c == capacity)
			signalNotFull();
		return x;
	}

	public E peek() {
		if (count.get() == 0)
			return null;
		final ReentrantLock takeLock = this.takeLock;
		takeLock.lock();
		try {
			Node<E> first = head.next;
			if (first == null)
				return null;
			else
				return first.item;
		} finally {
			takeLock.unlock();
		}
	}

	/**
	 * Removes a single instance of the specified element from this queue, if
it
	 * is present.
	 */
	public boolean remove(Object o) {
		if (o == null)
			return false;
		boolean removed = false;
		fullyLock();
		try {
			Node<E> trail = head;
			Node<E> p = head.next;
			while (p != null) {
				if (o.equals(p.item)) {
					removed = true;
					break;
				}
				trail = p;
				p = p.next;
			}
			if (removed) {
				p.item = null;
				trail.next = p.next;
				if (last == p)
					last = trail;
				if (count.getAndDecrement() == capacity)
					notFull.signalAll();
			}
		} finally {
			fullyUnlock();
		}
		return removed;
	}

	public Object[] toArray() {
		fullyLock();
		try {
			int size = count.get();
			Object[] a = new Object[size];
			int k = 0;
			for (Node<E> p = head.next; p != null; p = p.next)
				a[k++] = p.item;
			return a;
		} finally {
			fullyUnlock();
		}
	}

	public <T> T[] toArray(T[] a) {
		fullyLock();
		try {
			int size = count.get();
			if (a.length < size)
				a = (T[]) java.lang.reflect.Array.newInstance(a.getClass()
						.getComponentType(), size);

			int k = 0;
			for (Node p = head.next; p != null; p = p.next)
				a[k++] = (T) p.item;
			if (a.length > k)
				a[k] = null;
			return a;
		} finally {
			fullyUnlock();
		}
	}

	public String toString() {
		fullyLock();
		try {
			return super.toString();
		} finally {
			fullyUnlock();
		}
	}

	/**
	 * Atomically removes all of the elements from this queue. The queue will
be
	 * empty after this call returns.
	 */
	public void clear() {
		fullyLock();
		try {
			head.next = null;
			assert head.item == null;
			last = head;
			if (count.getAndSet(0) == capacity)
				notFull.signalAll();
		} finally {
			fullyUnlock();
		}
	}

	public int drainTo(Collection<? super E> c) {
		if (c == null)
			throw new NullPointerException();
		if (c == this)
			throw new IllegalArgumentException();
		Node first;
		fullyLock();
		try {
			first = head.next;
			head.next = null;
			assert head.item == null;
			last = head;
			if (count.getAndSet(0) == capacity)
				notFull.signalAll();
		} finally {
			fullyUnlock();
		}
		// Transfer the elements outside of locks
		int n = 0;
		for (Node<E> p = first; p != null; p = p.next) {
			c.add(p.item);
			p.item = null;
			++n;
		}
		return n;
	}

	public int drainTo(Collection<? super E> c, int maxElements) {
		if (c == null)
			throw new NullPointerException();
		if (c == this)
			throw new IllegalArgumentException();
		fullyLock();
		try {
			int n = 0;
			Node<E> p = head.next;
			while (p != null && n < maxElements) {
				c.add(p.item);
				p.item = null;
				p = p.next;
				++n;
			}
			if (n != 0) {
				head.next = p;
				assert head.item == null;
				if (p == null)
					last = head;
				if (count.getAndAdd(-n) == capacity)
					notFull.signalAll();
			}
			return n;
		} finally {
			fullyUnlock();
		}
	}

	/**
	 * Returns an iterator over the elements in this queue in proper sequence.
	 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
	 * will never throw {@link java.util.ConcurrentModificationException}, and
	 * guarantees to traverse elements as they existed upon construction of the
	 * iterator, and may (but is not guaranteed to) reflect any modifications
	 * subsequent to construction.
	 * 
	 * @return an iterator over the elements in this queue in proper sequence.
	 */
	public Iterator<E> iterator() {
		return new Itr();
	}

	private class Itr implements Iterator<E> {
		/*
		 * Basic weak-consistent iterator. At all times hold the next item to
		 * hand out so that if hasNext() reports true, we will still have it to
		 * return even if lost race with a take etc.
		 */
		private Node<E> current;
		private Node<E> lastRet;
		private E currentElement;

		Itr() {
			final ReentrantLock putLock = MesssageDispatchChannel.this.putLock;
			final ReentrantLock takeLock = MesssageDispatchChannel.this.takeLock;
			putLock.lock();
			takeLock.lock();
			try {
				current = head.next;
				if (current != null)
					currentElement = current.item;
			} finally {
				takeLock.unlock();
				putLock.unlock();
			}
		}

		public boolean hasNext() {
			return current != null;
		}

		public E next() {
			final ReentrantLock putLock = MesssageDispatchChannel.this.putLock;
			final ReentrantLock takeLock = MesssageDispatchChannel.this.takeLock;
			putLock.lock();
			takeLock.lock();
			try {
				if (current == null)
					throw new NoSuchElementException();
				E x = currentElement;
				lastRet = current;
				current = current.next;
				if (current != null)
					currentElement = current.item;
				return x;
			} finally {
				takeLock.unlock();
				putLock.unlock();
			}
		}

		public void remove() {
			if (lastRet == null)
				throw new IllegalStateException();
			final ReentrantLock putLock = MesssageDispatchChannel.this.putLock;
			final ReentrantLock takeLock = MesssageDispatchChannel.this.takeLock;
			putLock.lock();
			takeLock.lock();
			try {
				Node<E> node = lastRet;
				lastRet = null;
				Node<E> trail = head;
				Node<E> p = head.next;
				while (p != null && p != node) {
					trail = p;
					p = p.next;
				}
				if (p == node) {
					p.item = null;
					trail.next = p.next;
					if (last == p)
						last = trail;
					int c = count.getAndDecrement();
					if (c == capacity)
						notFull.signalAll();
				}
			} finally {
				takeLock.unlock();
				putLock.unlock();
			}
		}
	}

	/**
	 * Save the state to a stream (that is, serialize it).
	 * 
	 * @serialData The capacity is emitted (int), followed by all of its
	 *             elements (each an <tt>Object</tt>) in the proper order,
	 *             followed by a null
	 * @param s
	 *            the stream
	 */
	private void writeObject(java.io.ObjectOutputStream s)
			throws java.io.IOException {

		fullyLock();
		try {
			// Write out any hidden stuff, plus capacity
			s.defaultWriteObject();

			// Write out all elements in the proper order.
			for (Node<E> p = head.next; p != null; p = p.next)
				s.writeObject(p.item);

			// Use trailing null as sentinel
			s.writeObject(null);
		} finally {
			fullyUnlock();
		}
	}

	/**
	 * Reconstitute this queue instance from a stream (that is, deserialize
it).
	 * 
	 * @param s
	 *            the stream
	 */
	private void readObject(java.io.ObjectInputStream s)
			throws java.io.IOException, ClassNotFoundException {
		// Read in capacity, and any hidden stuff
		s.defaultReadObject();

		count.set(0);
		last = head = new Node<E>(null);

		// Read in all elements and place in queue
		for (;;) {
			E item = (E) s.readObject();
			if (item == null)
				break;
			add(item);
		}
	}
}

-- 
View this message in context: http://www.nabble.com/No-Durable-topic-consumer-OutofMemory-solution-tf4155101s2354.html#a11822109
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Mime
View raw message