hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1673339 - in /hama/trunk: core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/queue/ core/src/test/java/org/apache/hama/bsp/ graph/src/main/java/org/apache/...
Date Tue, 14 Apr 2015 02:19:08 GMT
Author: edwardyoon
Date: Tue Apr 14 02:19:07 2015
New Revision: 1673339

URL: http://svn.apache.org/r1673339
Log:
Use thread pool

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Tue Apr 14 02:19:07 2015
@@ -204,8 +204,4 @@ public interface BSPPeer<K1, V1, K2, V2,
    */
   public TaskAttemptID getTaskId();
 
-  public List<List<M>> getSubLists(int num);
-
-  public void clearIncomingMessages();
-
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Apr 14 02:19:07
2015
@@ -512,11 +512,6 @@ public final class BSPPeerImpl<K1, V1, K
     messenger.clearOutgoingMessages();
   }
   
-  @Override
-  public final void clearIncomingMessages() {
-    messenger.clearIncomingMessages();
-  }
-
   /**
    * @return the string as host:port of this Peer
    */
@@ -673,9 +668,4 @@ public final class BSPPeerImpl<K1, V1, K
     return taskId;
   }
 
-  @Override
-  public List<List<M>> getSubLists(int num) {
-    return messenger.getSubLists(num);
-  }
-
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Tue Apr 14 02:19:07 2015
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.Queue;
 
@@ -122,11 +121,6 @@ public abstract class AbstractMessageMan
     return localQueue.poll();
   }
 
-  @Override
-  public List<List<M>> getSubLists(int num) {
-    return localQueue.getSubLists(num);
-  }
-  
   /*
    * (non-Javadoc)
    * @see org.apache.hama.bsp.message.MessageManager#getNumCurrentMessages()

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
Tue Apr 14 02:19:07 2015
@@ -36,9 +36,9 @@ import org.apache.hama.bsp.BSPMessageBun
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.ipc.AsyncRPC;
 import org.apache.hama.ipc.AsyncServer;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.util.LRUCache;
 
 /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
Tue Apr 14 02:19:07 2015
@@ -24,8 +24,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Tue Apr
14 02:19:07 2015
@@ -20,7 +20,6 @@ package org.apache.hama.bsp.message;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.io.Writable;
@@ -92,11 +91,6 @@ public interface MessageManager<M extend
   public void clearOutgoingMessages();
 
   /**
-   * Clears the incoming queue. Can be used to switch queues.
-   */
-  public void clearIncomingMessages();
-  
-  /**
    * Gets the number of messages in the current queue.
    * 
    */
@@ -129,5 +123,4 @@ public interface MessageManager<M extend
    */
   public InetSocketAddress getListenerAddress();
 
-  public List<List<M>> getSubLists(int num);
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Tue Apr
14 02:19:07 2015
@@ -116,15 +116,4 @@ public final class MemoryQueue<M extends
     return this;
   }
 
-  @Override
-  public List<List<M>> getSubLists(int num) {
-    List<List<M>> subLists = new ArrayList<List<M>>();
-    subLists.add(Lists.newArrayList(deque.iterator()));
-    Iterator<BSPMessageBundle<M>> it = bundles.iterator();
-    while (it.hasNext()) {
-      subLists.add(Lists.newArrayList(it.next().iterator()));
-    }
-    return subLists;
-  }
-
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Tue
Apr 14 02:19:07 2015
@@ -17,8 +17,6 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-import java.util.List;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -64,8 +62,6 @@ public interface MessageQueue<M extends
    */
   public void add(M item);
 
-  public List<List<M>> getSubLists(int num);
-  
   /**
    * Clears all entries in the given queue.
    */

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Tue
Apr 14 02:19:07 2015
@@ -17,8 +17,6 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
@@ -186,8 +184,4 @@ public final class SingleLockQueue<T ext
     }
   }
 
-  @Override
-  public List<List<T>> getSubLists(int num) {
-    return queue.getSubLists(num);
-  }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
Tue Apr 14 02:19:07 2015
@@ -17,7 +17,6 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 
@@ -26,8 +25,6 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
-import com.google.common.collect.Lists;
-
 /**
  * Heap (Java's priority queue) based message queue implementation that supports
  * sorted receive and send.
@@ -103,9 +100,4 @@ public final class SortedMemoryQueue<M e
     return this;
   }
 
-  @Override
-  public List<List<M>> getSubLists(int num) {
-    return Lists.partition(Lists.newArrayList(queue.iterator()), num);
-  }
-
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Tue Apr 14 02:19:07
2015
@@ -151,18 +151,6 @@ public class TestCheckpoint extends Test
       return null;
     }
 
-    @Override
-    public List<List<Text>> getSubLists(int num) {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-    @Override
-    public void clearIncomingMessages() {
-      // TODO Auto-generated method stub
-      
-    }
-
   }
 
   public static class TestBSPPeer implements
@@ -319,18 +307,6 @@ public class TestCheckpoint extends Test
       return null;
     }
 
-    @Override
-    public List<List<Text>> getSubLists(int num) {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-    @Override
-    public void clearIncomingMessages() {
-      // TODO Auto-generated method stub
-      
-    }
-
   }
 
   public static class TempSyncClient extends BSPPeerSyncClient {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Apr 14 02:19:07
2015
@@ -18,11 +18,11 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,8 +43,6 @@ import org.apache.hama.bsp.sync.SyncExce
 import org.apache.hama.commons.util.KeyValuePair;
 import org.apache.hama.util.ReflectionUtils;
 
-import com.google.common.collect.Lists;
-
 /**
  * Fully generic graph job runner.
  * 
@@ -232,47 +230,29 @@ public final class GraphJobRunner<V exte
    * iterate over our messages and vertices in sorted order. That means that we
    * need to seek the first vertex that has the same ID as the iterated message.
    */
+  @SuppressWarnings("unchecked")
   private void doSuperstep(GraphJobMessage currentMessage,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     this.changedVertexCnt = 0;
     vertices.startSuperstep();
 
-    List<Thread> runners = new ArrayList<Thread>();
+    ExecutorService executor = Executors.newFixedThreadPool(conf.getInt(
+        "hama.graph.thread.num", 1000));
 
-    List<List<GraphJobMessage>> subLists = peer.getSubLists(conf.getInt(
-        "hama.graph.thread.num", 100));
-
-    if (subLists.size() == 0) {
-      if (currentMessage != null) {
-        List<GraphJobMessage> first = new ArrayList<GraphJobMessage>();
-        first.add(currentMessage);
-        runners.add(new ComputeReceivedMessage(first));
-      }
-    } else {
-      for (List<GraphJobMessage> subList : subLists) {
-        if (runners.size() == 0)
-          subList.add(currentMessage);
-
-        runners.add(new ComputeReceivedMessage(subList));
-      }
-    }
-
-    for (Thread computer : runners) {
-      computer.start();
-    }
-
-    for (Thread computer : runners) {
-      try {
-        computer.join();
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
+    while (currentMessage != null) {
+      Runnable worker = new ComputeRunnable(
+          vertices.get((V) currentMessage.getVertexId()),
+          (Iterable<M>) currentMessage.getIterableMessages());
+      executor.execute(worker);
+      
+      currentMessage = peer.getCurrentMessage();
+    }
+    
+    executor.shutdown();
+    while (!executor.isTerminated()) {
     }
 
-    // After using getSubLists(), we need to clean up local vertex messages.
-    peer.clearIncomingMessages();
-
     for (V v : vertices.getNotComputedVertices()) {
       if (!vertices.get(v).isHalted()) {
         Vertex<V, E, M> vertex = vertices.get(v);
@@ -298,74 +278,36 @@ public final class GraphJobRunner<V exte
     this.changedVertexCnt = 0;
     vertices.startSuperstep();
 
-    List<Thread> runners = new ArrayList<Thread>();
-
-    for (List<V> vLists : Lists.partition(new ArrayList<V>(vertices.keySet()),
-        conf.getInt("hama.graph.thread.num", 100))) {
-      runners.add(new Computer(vLists));
-    }
-
-    for (Thread computer : runners) {
-      computer.start();
+    ExecutorService executor = Executors.newFixedThreadPool(conf.getInt(
+        "hama.graph.thread.num", 1000));
+    
+    for(Vertex<V, E, M> v : vertices.getValues()) {
+      Runnable worker = new ComputeRunnable(v, Collections.singleton(v.getValue()));
+      executor.execute(worker);
     }
-
-    for (Thread computer : runners) {
-      try {
-        computer.join();
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
+    executor.shutdown();
+    while (!executor.isTerminated()) {
     }
-
+    
     vertices.finishSuperstep();
     getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
     iteration++;
   }
 
-  class ComputeReceivedMessage extends Thread {
-    List<GraphJobMessage> subList;
-
-    public ComputeReceivedMessage(List<GraphJobMessage> subList) {
-      this.subList = subList;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void run() {
-      try {
-        for (GraphJobMessage msg : subList) {
-          Vertex<V, E, M> vertex = vertices.get((V) msg.getVertexId());
-
-          // reactivation
-          if (vertex.isHalted()) {
-            vertex.setActive();
-          }
-
-          vertex.compute((Iterable<M>) msg.getIterableMessages());
-          vertices.finishVertexComputation(vertex);
-        }
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  class Computer extends Thread {
-    List<V> vList;
-
-    public Computer(List<V> vList) {
-      this.vList = vList;
+  class ComputeRunnable implements Runnable {
+    Vertex<V, E, M> vertex;
+    Iterable<M> msgs;
+
+    public ComputeRunnable(Vertex<V, E, M> vertex, Iterable<M> msgs) {
+      this.vertex = vertex;
+      this.msgs = msgs;
     }
 
     @Override
     public void run() {
       try {
-        for (V v : vList) {
-          Vertex<V, E, M> vertex = vertices.get(v);
-          vertex.setup(conf);
-          vertex.compute(Collections.singleton(vertex.getValue()));
+          vertex.compute(msgs);
           vertices.finishVertexComputation(vertex);
-        }
       } catch (IOException e) {
         e.printStackTrace();
       }
@@ -430,15 +372,17 @@ public final class GraphJobRunner<V exte
   private void loadVertices(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
-    final VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable,
Writable, V, E, M>) ReflectionUtils
+    VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable,
Writable, V, E, M>) ReflectionUtils
         .newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
             VertexInputReader.class));
 
+    ExecutorService executor = Executors.newFixedThreadPool(conf.getInt(
+        "hama.graph.thread.num", 1000));
+    
     try {
       KeyValuePair<Writable, Writable> next = null;
       while ((next = peer.readNext()) != null) {
-        // TODO read sequentially, and convert records using thread.
-        
+
         Vertex<V, E, M> vertex = GraphJobRunner
             .<V, E, M> newVertexInstance(VERTEX_CLASS);
 
@@ -450,7 +394,8 @@ public final class GraphJobRunner<V exte
 
         String dstHost = getHostName(vertex.getVertexID());
         if (peer.getPeerName().equals(dstHost)) {
-          addVertex(vertex);
+          Runnable worker = new LoadWorker(vertex);
+          executor.execute(worker);
         } else {
           peer.send(dstHost, new GraphJobMessage(vertex));
         }
@@ -458,46 +403,33 @@ public final class GraphJobRunner<V exte
     } catch (Exception e) {
       e.printStackTrace();
     }
+    
     peer.sync();
 
-    List<List<GraphJobMessage>> subLists = peer.getSubLists(conf.getInt(
-        "hama.graph.thread.num", 100));
-    List<Thread> runners = new ArrayList<Thread>(subLists.size());
-
-    for (List<GraphJobMessage> subList : subLists) {
-      runners.add(new LoadReceivedMessage(subList));
-    }
-
-    for (Thread computer : runners) {
-      computer.start();
+    GraphJobMessage msg;
+    while ((msg = peer.getCurrentMessage()) != null) {
+      Runnable worker = new LoadWorker((Vertex<V, E, M>) msg.getVertex());
+      executor.execute(worker);
     }
-
-    for (Thread computer : runners) {
-      try {
-        computer.join();
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
+    executor.shutdown();
+    while (!executor.isTerminated()) {
     }
 
     LOG.info(vertices.size() + " vertices are loaded into "
         + peer.getPeerName());
   }
+  
+  class LoadWorker implements Runnable {
+    Vertex<V, E, M> vertex;
 
-  class LoadReceivedMessage extends Thread {
-    List<GraphJobMessage> subList;
-
-    public LoadReceivedMessage(List<GraphJobMessage> subList) {
-      this.subList = subList;
+    public LoadWorker(Vertex<V, E, M> vertex) {
+      this.vertex = vertex;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void run() {
       try {
-        for (GraphJobMessage msg : subList) {
-          addVertex((Vertex<V, E, M>) msg.getVertex());
-        }
+        addVertex(vertex);
       } catch (IOException e) {
         e.printStackTrace();
       }
@@ -516,6 +448,9 @@ public final class GraphJobRunner<V exte
 
     vertex.setRunner(this);
     vertices.put(vertex);
+    
+    // call once
+    vertex.setup(conf);
   }
 
   /**

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
Tue Apr 14 02:19:07 2015
@@ -17,8 +17,6 @@
  */
 package org.apache.hama.graph;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,8 +26,6 @@ import org.apache.hama.bsp.TaskAttemptID
 import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.bsp.message.queue.SynchronizedQueue;
 
-import com.google.common.collect.Lists;
-
 public class IncomingVertexMessageManager<M extends WritableComparable<M>>
     implements SynchronizedQueue<GraphJobMessage> {
 
@@ -111,12 +107,4 @@ public class IncomingVertexMessageManage
     return this;
   }
 
-  @Override
-  public List<List<GraphJobMessage>> getSubLists(int num) {
-    if (mapMessages.size() > 0)
-      return Lists.partition(new ArrayList<GraphJobMessage>(mapMessages), num);
-    else
-      return msgPerVertex.getSubLists(num);
-  }
-
 }



Mime
View raw message