giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [06/47] git commit: updated refs/heads/release-1.1 to 4c139ee
Date Sun, 26 Oct 2014 01:21:49 GMT
GIRAPH-919: Add worker to worker communication (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9e1a5a05
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9e1a5a05
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9e1a5a05

Branch: refs/heads/release-1.1
Commit: 9e1a5a05368a1543aaf6bea0176b9d79058e03b9
Parents: b218d72
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Tue Jun 24 09:51:40 2014 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Tue Jun 24 09:51:40 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../java/org/apache/giraph/comm/ServerData.java | 35 +++++++++
 .../giraph/comm/requests/RequestType.java       |  4 +-
 .../SendWorkerToWorkerMessageRequest.java       | 80 ++++++++++++++++++++
 .../apache/giraph/graph/GraphTaskManager.java   |  1 +
 .../apache/giraph/master/BspServiceMaster.java  | 11 ++-
 .../org/apache/giraph/worker/WorkerContext.java | 73 +++++++++++++++++-
 7 files changed, 202 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d315a9f..8da555d 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-919: Add worker to worker communication (majakabiljo)
+
   GIRAPH-922: SimpleEdgeStore has a bug causing NPE (pavanka)
 
   GIRAPH-915: With BigDataIO some messages can get ignored (majakabiljo via pavanka)

http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index f0ecca2..b3f8733 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -36,6 +36,9 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -84,6 +87,13 @@ public class ServerData<I extends WritableComparable,
   /** Service worker */
   private final CentralizedServiceWorker<I, V, E> serviceWorker;
 
+  /** Store for current messages from other workers to this worker */
+  private volatile List<Writable> currentWorkerToWorkerMessages =
+      Collections.synchronizedList(new ArrayList<Writable>());
+  /** Store for message from other workers to this worker for next superstep */
+  private volatile List<Writable> incomingWorkerToWorkerMessages =
+      Collections.synchronizedList(new ArrayList<Writable>());
+
   /**
    * Constructor.
    *
@@ -166,6 +176,10 @@ public class ServerData<I extends WritableComparable,
             messageStoreFactory.newStore(conf.getIncomingMessageValueFactory());
     incomingMessageStore =
         messageStoreFactory.newStore(conf.getOutgoingMessageValueFactory());
+
+    currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages;
+    incomingWorkerToWorkerMessages =
+        Collections.synchronizedList(new ArrayList<Writable>());
   }
 
   /**
@@ -204,4 +218,25 @@ public class ServerData<I extends WritableComparable,
   public CentralizedServiceWorker<I, V, E> getServiceWorker() {
     return this.serviceWorker;
   }
+
+  /**
+   * Get and clear worker to worker messages for this superstep. Can be
+   * called only once per superstep.
+   *
+   * @return List of messages for this worker
+   */
+  public List<Writable> getAndClearCurrentWorkerToWorkerMessages() {
+    List<Writable> ret = currentWorkerToWorkerMessages;
+    currentWorkerToWorkerMessages = null;
+    return ret;
+  }
+
+  /**
+   * Add incoming message to this worker for next superstep. Thread-safe.
+   *
+   * @param message Message received
+   */
+  public void addIncomingWorkerToWorkerMessage(Writable message) {
+    incomingWorkerToWorkerMessages.add(message);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index 7fe2ae7..408295c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -58,7 +58,9 @@ public enum RequestType {
   /** Send aggregators from master to worker owners */
   SEND_AGGREGATORS_TO_OWNER_REQUEST(SendAggregatorsToOwnerRequest.class),
   /** Send aggregators from worker owner to other workers */
-  SEND_AGGREGATORS_TO_WORKER_REQUEST(SendAggregatorsToWorkerRequest.class);
+  SEND_AGGREGATORS_TO_WORKER_REQUEST(SendAggregatorsToWorkerRequest.class),
+  /** Send message from worker to worker */
+  SEND_WORKER_TO_WORKER_MESSAGE_REQUEST(SendWorkerToWorkerMessageRequest.class);
 
   /** Class of request which this type corresponds to */
   private final Class<? extends WritableRequest> requestClass;

http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerToWorkerMessageRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerToWorkerMessageRequest.java
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerToWorkerMessageRequest.java
new file mode 100644
index 0000000..a2505ef
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerToWorkerMessageRequest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/** Request which sends any Writable message from one worker to another */
+public class SendWorkerToWorkerMessageRequest extends WritableRequest
+    implements WorkerRequest<WritableComparable, Writable, Writable> {
+  /** Message sent */
+  private Writable message;
+
+  /**
+   * Default constructor, for reflection
+   */
+  public SendWorkerToWorkerMessageRequest() {
+  }
+
+  /**
+   * Constructor with message
+   *
+   * @param message Message sent
+   */
+  public SendWorkerToWorkerMessageRequest(Writable message) {
+    this.message = message;
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SEND_WORKER_TO_WORKER_MESSAGE_REQUEST;
+  }
+
+  @Override
+  void writeRequest(DataOutput output) throws IOException {
+    Text.writeString(output, message.getClass().getName());
+    message.write(output);
+  }
+
+  @Override
+  void readFieldsRequest(DataInput input) throws IOException {
+    String className = Text.readString(input);
+    try {
+      message = (Writable) Class.forName(className).newInstance();
+      message.readFields(input);
+    } catch (InstantiationException | IllegalAccessException |
+        ClassNotFoundException e) {
+      throw new IllegalStateException(
+          "readFieldsRequest: Exception occurred", e);
+    }
+  }
+
+  @Override
+  public void doRequest(
+      ServerData<WritableComparable, Writable, Writable> serverData) {
+    serverData.addIncomingWorkerToWorkerMessage(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index ad5fc91..e13eedd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -432,6 +432,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
     serviceWorker.prepareSuperstep();
 
     serviceWorker.getWorkerContext().setGraphState(graphState);
+    serviceWorker.getWorkerContext().setupSuperstep(serviceWorker);
     GiraphTimerContext preSuperstepTimer = wcPreSuperstepTimer.time();
     serviceWorker.getWorkerContext().preSuperstep();
     preSuperstepTimer.stop();

http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index ad7e045..02d4f2b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -98,6 +98,8 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -175,7 +177,7 @@ public class BspServiceMaster<I extends WritableComparable,
   private MasterServer masterServer;
   /** Master info */
   private MasterInfo masterInfo;
-  /** List of workers in current superstep */
+  /** List of workers in current superstep, sorted by task id */
   private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
   /** Limit locality information added to each InputSplit znode */
   private final int localityLimit = 5;
@@ -1555,6 +1557,13 @@ public class BspServiceMaster<I extends WritableComparable,
       setJobStateFailed("coordinateSuperstep: Not enough healthy workers for " +
                     "superstep " + getSuperstep());
     } else {
+      // Sort this list, so order stays the same over supersteps
+      Collections.sort(chosenWorkerInfoList, new Comparator<WorkerInfo>() {
+        @Override
+        public int compare(WorkerInfo wi1, WorkerInfo wi2) {
+          return Integer.compare(wi1.getTaskId(), wi2.getTaskId());
+        }
+      });
       for (WorkerInfo workerInfo : chosenWorkerInfoList) {
         String workerInfoHealthyPath =
             getWorkerInfoHealthyPath(getApplicationAttempt(),

http://git-wip-us.apache.org/repos/asf/giraph/blob/9e1a5a05/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
index 17347db..29835c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -18,24 +18,36 @@
 
 package org.apache.giraph.worker;
 
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.requests.SendWorkerToWorkerMessageRequest;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.graph.GraphState;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper;
 
+import java.util.List;
+
 /**
  * WorkerContext allows for the execution of user code
  * on a per-worker basis. There's one WorkerContext per worker.
  */
 @SuppressWarnings("rawtypes")
 public abstract class WorkerContext
-  extends DefaultImmutableClassesGiraphConfigurable
-  implements WorkerAggregatorUsage {
+    extends DefaultImmutableClassesGiraphConfigurable
+    implements WorkerAggregatorUsage {
+
   /** Global graph state */
   private GraphState graphState;
   /** Worker aggregator usage */
   private WorkerAggregatorUsage workerAggregatorUsage;
 
+  /** Service worker */
+  private CentralizedServiceWorker serviceWorker;
+  /** Sorted list of other participating workers */
+  private List<WorkerInfo> workerList;
+  /** Index of this worker within workerList */
+  private int myWorkerIndex;
+
   /**
    * Set the graph state.
    *
@@ -46,6 +58,17 @@ public abstract class WorkerContext
   }
 
   /**
+   * Setup superstep.
+   *
+   * @param serviceWorker Service worker containing all the information
+   */
+  public void setupSuperstep(CentralizedServiceWorker<?, ?, ?> serviceWorker) {
+    this.serviceWorker = serviceWorker;
+    workerList = serviceWorker.getWorkerInfoList();
+    myWorkerIndex = workerList.indexOf(serviceWorker.getWorkerInfo());
+  }
+
+  /**
    * Set worker aggregator usage
    *
    * @param workerAggregatorUsage Worker aggregator usage
@@ -81,6 +104,52 @@ public abstract class WorkerContext
   public abstract void preSuperstep();
 
   /**
+   * Get number of workers
+   *
+   * @return Number of workers
+   */
+  public int getWorkerCount() {
+    return workerList.size();
+  }
+
+  /**
+   * Get index for this worker
+   *
+   * @return Index of this worker
+   */
+  public int getMyWorkerIndex() {
+    return myWorkerIndex;
+  }
+
+  /**
+   * Get messages which other workers sent to this worker and clear them (can
+   * be called once per superstep)
+   *
+   * @return Messages received
+   */
+  public List<Writable> getAndClearMessagesFromOtherWorkers() {
+    return serviceWorker.getServerData().
+        getAndClearCurrentWorkerToWorkerMessages();
+  }
+
+  /**
+   * Send message to another worker
+   *
+   * @param message Message to send
+   * @param workerIndex Index of the worker to send the message to
+   */
+  public void sendMessageToWorker(Writable message, int workerIndex) {
+    SendWorkerToWorkerMessageRequest request =
+        new SendWorkerToWorkerMessageRequest(message);
+    if (workerIndex == myWorkerIndex) {
+      request.doRequest(serviceWorker.getServerData());
+    } else {
+      serviceWorker.getWorkerClient().sendWritableRequest(
+          workerList.get(workerIndex).getTaskId(), request);
+    }
+  }
+
+  /**
    * Execute user code.
    * This method is executed once on each Worker after each
    * superstep ends.


Mime
View raw message