hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1576554 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/message/queue/
Date Wed, 12 Mar 2014 00:59:00 GMT
Author: edwardyoon
Date: Wed Mar 12 00:59:00 2014
New Revision: 1576554

URL: http://svn.apache.org/r1576554
Log:
HAMA-568: Add faster synchronized collections for message queues

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1576554&r1=1576553&r2=1576554&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Mar 12 00:59:00 2014
@@ -11,6 +11,8 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-568: Add faster synchronized collections for message queues (edwardyoon)
+
 Release 0.6.4 - Mar 5, 2014
 
   NEW FEATURES

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1576554&r1=1576553&r2=1576554&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Wed Mar
12 00:59:00 2014
@@ -17,9 +17,8 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-import java.util.ArrayDeque;
-import java.util.Deque;
 import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -28,9 +27,10 @@ import org.apache.hama.bsp.TaskAttemptID
 /**
  * LinkedList backed queue structure for bookkeeping messages.
  */
-public final class MemoryQueue<M extends Writable> extends POJOMessageQueue<M>
{
+public final class MemoryQueue<M extends Writable> implements
+    SynchronizedQueue<M> {
 
-  private final Deque<M> deque = new ArrayDeque<M>();
+  private final ConcurrentLinkedQueue<M> deque = new ConcurrentLinkedQueue<M>();
   private Configuration conf;
 
   @Override
@@ -112,4 +112,9 @@ public final class MemoryQueue<M extends
   public boolean isMemoryBasedQueue() {
     return true;
   }
+
+  @Override
+  public MessageQueue<M> getMessageQueue() {
+    return this;
+  }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1576554&r1=1576553&r2=1576554&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Wed
Mar 12 00:59:00 2014
@@ -181,8 +181,11 @@ public final class SingleLockQueue<T> im
   /*
    * static constructor methods to be type safe
    */
-
   public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue)
{
+    if(queue.isMemoryBasedQueue()) {
+      return (SynchronizedQueue<T>) queue;
+    }
+    
     return new SingleLockQueue<T>(queue);
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1576554&r1=1576553&r2=1576554&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
Wed Mar 12 00:59:00 2014
@@ -18,7 +18,8 @@
 package org.apache.hama.bsp.message.queue;
 
 import java.util.Iterator;
-import java.util.PriorityQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
@@ -31,9 +32,9 @@ import org.apache.hama.bsp.message.bundl
  * sorted receive and send.
  */
 public final class SortedMemoryQueue<M extends WritableComparable<M>>
-    implements MessageQueue<M>, BSPMessageInterface<M> {
+    implements SynchronizedQueue<M>, BSPMessageInterface<M> {
 
-  private final PriorityQueue<M> queue = new PriorityQueue<M>();
+  private final BlockingQueue<M> queue = new PriorityBlockingQueue<M>();
   private Configuration conf;
 
   @Override
@@ -94,7 +95,7 @@ public final class SortedMemoryQueue<M e
 
   @Override
   public void close() {
-    this.clear();;
+    this.clear();
   }
 
   @Override
@@ -122,4 +123,9 @@ public final class SortedMemoryQueue<M e
     return true;
   }
 
+  @Override
+  public MessageQueue<M> getMessageQueue() {
+    return this;
+  }
+
 }



Mime
View raw message