hadoop-common-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hiroshi Ikeda (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HADOOP-10278) Refactor to make CallQueue pluggable
Date Fri, 14 Feb 2014 02:07:20 GMT

    [ https://issues.apache.org/jira/browse/HADOOP-10278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13901017#comment-13901017
] 

Hiroshi Ikeda commented on HADOOP-10278:
----------------------------------------

Added here my image before I forget:

{code}
class CallQueue {
	final AtomicInteger sizeRef = new ...
	final Semaphore putSemaphore;
	final Semaphore takeSemaphore = new Semaphore(0);
	final ReadWriteLock internalQueueLock = new ...
	/** The reference is guarded by internalQueueLock. */
	InternalQueue internalQueue;

	CallQueue(int maxQueueSize, IntenralQueue initInternalQueue) {
		putSemaphore = new Semaphore(maxQueueSize);
		intenralQueue = initInteranlQueue;
	}

	/** Implementation must be thread safe. */
	interface InternalQueue {
		/** Returns null if no element. */
		Call poll();
		void offer(Call);
	}

	void replaceInternalQueue(InternalQueue internalQueue) {
		internalQueueLock.writeLock().lock();
		try {
			Call call;
			while((call = this.internalQueue.poll()) != null) {
				interanlQueue.offer(call);
			}
			this.internalQueue = internalQueue;
		} finally {
			intenralQueueLock.writeLock.release();
		}
	}

	void put(Call call) throws InterruptedException {
		putSemaphore.aquire();
		interalQueueLock.readLock().lock();
		try {
			internalQueue.offer(call);
		} finally {
			internalQueueLock.readLock().release();
		}
		sizeRef.incrementAndGet();
		takeSemaphore.release();
	}

	Call take() throws InterruptedException {
		Call result;
		takeSemaphore.aquire();
		interalQueueLock.readLock().lock();
		try {
			result = internalQueue.poll();
		} finally {
			internalQueueLock.readLock().release();
		}
		sizeRef.decrementAndGet();
		putSemaphore.release();
		return result;
	}
		
	int size() {
		return sizeRef.get();
	}
}

class InternalQueueSimpleImpl implements InternalQueue {
	final ConcurrentLinkedQueue queue = new ...
	@Override Call poll() { return queue.poll(); }
	@Override void offer(Call call) { queue.offer(call); }
}
{code}

> Refactor to make CallQueue pluggable
> ------------------------------------
>
>                 Key: HADOOP-10278
>                 URL: https://issues.apache.org/jira/browse/HADOOP-10278
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: ipc
>            Reporter: Chris Li
>         Attachments: HADOOP-10278-atomicref-adapter.patch, HADOOP-10278-atomicref-rwlock.patch,
HADOOP-10278-atomicref.patch, HADOOP-10278-atomicref.patch, HADOOP-10278-atomicref.patch,
HADOOP-10278-atomicref.patch, HADOOP-10278.patch, HADOOP-10278.patch
>
>
> * Refactor CallQueue into an interface, base, and default implementation that matches
today's behavior
> * Make the call queue impl configurable, keyed on port so that we minimize coupling



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Mime
View raw message