giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ereis...@apache.org
Subject [1/3] GIRAPH-469: Refactor GraphMapper (ereisman)
Date Mon, 28 Jan 2013 22:18:38 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/303386f7/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index f2ccb24..d5ad62b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -31,16 +31,16 @@ import org.apache.giraph.comm.netty.NettyWorkerClient;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerServer;
 import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.graph.AddressesAndPartitionsWritable;
+import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.graph.GraphTaskManager;
+import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.graph.InputSplitPaths;
+import org.apache.giraph.graph.InputSplitEvents;
 import org.apache.giraph.graph.FinishedSuperstepStats;
+import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.GlobalStats;
-import org.apache.giraph.graph.GraphMapper;
-import org.apache.giraph.graph.GraphState;
-import org.apache.giraph.graph.InputSplitEvents;
-import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.vertex.Vertex;
-import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
 import org.apache.giraph.partition.Partition;
@@ -159,7 +159,7 @@ public class BspServiceWorker<I extends WritableComparable,
    * @param serverPortList ZooKeeper server port list
    * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
    * @param context Mapper context
-   * @param graphMapper Graph mapper
+   * @param graphTaskManager GraphTaskManager for this compute node
    * @throws IOException
    * @throws InterruptedException
    */
@@ -167,9 +167,9 @@ public class BspServiceWorker<I extends WritableComparable,
     String serverPortList,
     int sessionMsecTimeout,
     Mapper<?, ?, ?, ?>.Context context,
-    GraphMapper<I, V, E, M> graphMapper)
+    GraphTaskManager<I, V, E, M> graphTaskManager)
     throws IOException, InterruptedException {
-    super(serverPortList, sessionMsecTimeout, context, graphMapper);
+    super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
     partitionExchangeChildrenChanged = new PredicateLock(context);
     registerBspEvent(partitionExchangeChildrenChanged);
     workerGraphPartitioner =
@@ -294,7 +294,7 @@ public class BspServiceWorker<I extends WritableComparable,
             false, false, true);
 
     GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
-        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphTaskManager(),
         null, null);
 
     VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
@@ -321,7 +321,7 @@ public class BspServiceWorker<I extends WritableComparable,
             false, false, true);
 
     GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
-        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphTaskManager(),
         null, null);
 
     EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
@@ -461,7 +461,7 @@ public class BspServiceWorker<I extends WritableComparable,
     // Add the partitions that this worker owns
     GraphState<I, V, E, M> graphState =
         new GraphState<I, V, E, M>(INPUT_SUPERSTEP, 0, 0,
-            getContext(), getGraphMapper(), null, null);
+            getContext(), getGraphTaskManager(), null, null);
     Collection<? extends PartitionOwner> masterSetPartitionOwners =
         startSuperstep(graphState);
     workerGraphPartitioner.updatePartitionOwners(
@@ -695,7 +695,7 @@ else[HADOOP_NON_SECURE]*/
     }
 
     getContext().setStatus("startSuperstep: " +
-        getGraphMapper().getGraphFunctions().toString() +
+        getGraphTaskManager().getGraphFunctions().toString() +
         " - Attempt=" + getApplicationAttempt() +
         ", Superstep=" + getSuperstep());
     return addressesAndPartitions.getPartitionOwners();
@@ -719,7 +719,7 @@ else[HADOOP_NON_SECURE]*/
     // 6. Wait for the master's global stats, and check if done
     waitForRequestsToFinish();
 
-    graphState.getGraphMapper().notifyFinishedCommunication();
+    graphState.getGraphTaskManager().notifyFinishedCommunication();
 
     long workerSentMessages = 0;
     for (PartitionStats partitionStats : partitionStatsList) {
@@ -747,7 +747,7 @@ else[HADOOP_NON_SECURE]*/
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
         "finishSuperstep: (waiting for rest " +
             "of workers) " +
-            getGraphMapper().getGraphFunctions().toString() +
+            getGraphTaskManager().getGraphFunctions().toString() +
             " - Attempt=" + getApplicationAttempt() +
             ", Superstep=" + getSuperstep());
 
@@ -765,7 +765,7 @@ else[HADOOP_NON_SECURE]*/
     }
     incrCachedSuperstep();
     getContext().setStatus("finishSuperstep: (all workers done) " +
-        getGraphMapper().getGraphFunctions().toString() +
+        getGraphTaskManager().getGraphFunctions().toString() +
         " - Attempt=" + getApplicationAttempt() +
         ", Superstep=" + getSuperstep());
 
@@ -952,7 +952,7 @@ else[HADOOP_NON_SECURE]*/
   public void storeCheckpoint() throws IOException {
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
         "storeCheckpoint: Starting checkpoint " +
-            getGraphMapper().getGraphFunctions().toString() +
+            getGraphTaskManager().getGraphFunctions().toString() +
             " - Attempt=" + getApplicationAttempt() +
             ", Superstep=" + getSuperstep());
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/303386f7/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index acd4e2d..d09ca2b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -131,7 +131,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
             context, configuration, bspServiceWorker);
     this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
         graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
-        context, graphState.getGraphMapper(), workerClientRequestProcessor,
+        context, graphState.getGraphTaskManager(), workerClientRequestProcessor,
         null);
     this.useLocality = configuration.getBoolean(
         GiraphConstants.USE_INPUT_SPLIT_LOCALITY,


Mime
View raw message