Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2316E9115 for ; Wed, 3 Oct 2012 03:05:49 +0000 (UTC) Received: (qmail 14147 invoked by uid 500); 3 Oct 2012 03:05:49 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 14096 invoked by uid 500); 3 Oct 2012 03:05:48 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 14073 invoked by uid 99); 3 Oct 2012 03:05:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Oct 2012 03:05:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Oct 2012 03:05:45 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 825532388900; Wed, 3 Oct 2012 03:05:02 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1393266 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/comm/messages/ src/main/java/org/apache/giraph/comm/netty/ src/main/java/org/apache/giraph/comm/requests/ src/main/java/org/apache/giraph/... Date: Wed, 03 Oct 2012 03:05:01 -0000 To: commits@giraph.apache.org From: aching@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121003030502.825532388900@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: aching Date: Wed Oct 3 03:05:01 2012 New Revision: 1393266 URL: http://svn.apache.org/viewvc?rev=1393266&view=rev Log: GIRAPH-328: Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition. (Eli Reisman via aching) Added: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java Removed: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java Modified: giraph/trunk/CHANGELOG giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Modified: giraph/trunk/CHANGELOG URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1393266&r1=1393265&r2=1393266&view=diff ============================================================================== --- giraph/trunk/CHANGELOG (original) +++ giraph/trunk/CHANGELOG Wed Oct 3 03:05:01 2012 @@ -2,6 +2,10 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-328: Outgoing messages from current superstep should be + grouped at the sender by owning worker, not by partition. (Eli + Reisman via aching) + GIRAPH-293: Should aggregators be checkpointed? (majakabiljo via aching) Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1393266&r1=1393265&r2=1393266&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java Wed Oct 3 03:05:01 2012 @@ -26,13 +26,14 @@ import java.util.Map; import org.apache.giraph.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.VertexCombiner; +import org.apache.giraph.graph.WorkerInfo; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import com.google.common.collect.Lists; /** - * Aggregates the messages to be send to partitions so they can be sent + * Aggregates the messages to be send to workers so they can be sent * in bulk. * * @param Vertex id @@ -44,11 +45,11 @@ public class SendMessageCache combiner; /** Internal cache */ - private Map>> messageCache = - new HashMap>>(); + private Map>>> messageCache = + new HashMap>>>(); /** Number of messages in each partition */ - private final Map messageCountMap = - new HashMap(); + private final Map messageCountMap = + new HashMap(); /** * Constructor @@ -66,17 +67,27 @@ public class SendMessageCachehost => partition => vertex * @return Number of messages in the partition. */ - public int addMessage(Integer partitionId, I destVertexId, M message) { + public int addMessage(WorkerInfo workerInfo, + final int partitionId, I destVertexId, M message) { // Get the message collection - Map> idMessagesMap = messageCache.get(partitionId); + Map>> partitionMap = + messageCache.get(workerInfo); + if (partitionMap == null) { + partitionMap = new HashMap>>(); + messageCache.put(workerInfo, partitionMap); + } + Map> idMessagesMap = partitionMap.get(partitionId); + if (idMessagesMap == null) { idMessagesMap = new HashMap>(); - messageCache.put(partitionId, idMessagesMap); + partitionMap.put(partitionId, idMessagesMap); } Collection messages = idMessagesMap.get(destVertexId); if (messages == null) { @@ -85,7 +96,7 @@ public class SendMessageCacheworkerInfo */ - public Map> removePartitionMessages(int partitionId) { - Map> idMessages = messageCache.remove(partitionId); - messageCountMap.put(partitionId, 0); - return idMessages; + public Map>> removeWorkerMessages( + WorkerInfo workerInfo) { + Map>> workerMessages = + messageCache.remove(workerInfo); + messageCountMap.put(workerInfo, 0); + return workerMessages; } /** @@ -125,9 +140,12 @@ public class SendMessageCache>> removeAllPartitionMessages() { - Map>> allMessages = messageCache; - messageCache = new HashMap>>(); + public Map>>> removeAllMessages() { + Map>>> + allMessages = messageCache; + messageCache = + new HashMap>>>(); messageCountMap.clear(); return allMessages; } Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1393266&r1=1393265&r2=1393266&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Wed Oct 3 03:05:01 2012 @@ -84,7 +84,7 @@ public class SimpleMessageStore messages) throws IOException { - int partitionId = getPartitonId(vertexId); + int partitionId = getPartitionId(vertexId); ConcurrentMap> partitionMap = map.get(partitionId); if (partitionMap == null) { ConcurrentMap> tmpMap = @@ -96,13 +96,12 @@ public class SimpleMessageStore currentMessages = - CollectionUtils.addConcurrent(vertexId, messages, partitionMap); + CollectionUtils.addConcurrent(vertexId, messages, partitionMap); if (combiner != null) { synchronized (currentMessages) { currentMessages = - Lists.newArrayList(combiner.combine(vertexId, currentMessages)); + Lists.newArrayList(combiner.combine(vertexId, currentMessages)); partitionMap.put(vertexId, currentMessages); } } @@ -148,9 +147,9 @@ public class SimpleMessageStore getVertexMessages(I vertexId) throws IOException { ConcurrentMap> partitionMap = - map.get(getPartitonId(vertexId)); + map.get(getPartitionId(vertexId)); return (partitionMap == null) ? Collections.emptyList() : - map.get(getPartitonId(vertexId)).get(vertexId); + partitionMap.get(vertexId); } @Override @@ -167,7 +166,7 @@ public class SimpleMessageStore> partitionMap = - map.get(getPartitonId(vertexId)); + map.get(getPartitionId(vertexId)); return (partitionMap == null) ? false : partitionMap.containsKey(vertexId); } @@ -190,7 +189,7 @@ public class SimpleMessageStore> partitionMap = - map.get(getPartitonId(vertexId)); + map.get(getPartitionId(vertexId)); if (partitionMap != null) { partitionMap.remove(vertexId); } @@ -212,7 +211,7 @@ public class SimpleMessageStore conf; /** Netty client that does that actual I/O */ @@ -89,8 +92,8 @@ public class NettyWorkerClient sendMutationsCache; - /** Maximum number of messages per partition before sending */ - private final int maxMessagesPerPartition; + /** Maximum number of messages per remote worker to cache before sending */ + private final int maxMessagesPerWorker; /** Maximum number of mutations per partition before sending */ private final int maxMutationsPerPartition; /** Maximum number of attempts to resolve an address*/ @@ -116,7 +119,7 @@ public class NettyWorkerClient= maxResolveAddressAttempts) { - throw new IllegalStateException("getInetSocketAddress: Coudldn't " + - "resolve " + address + " in " + resolveAttempts + " tries."); + address = resolveAddress(workerInfo.getInetSocketAddress()); + if (partitionId != NO_PARTITION_ID) { + // Only cache valid partition ids + partitionIndexAddressMap.put(partitionId, address); } - partitionIndexAddressMap.put(partitionId, address); } return address; } /** + * Utility method for getInetSocketAddress() + * @param address the address we are attempting to resolve + * @return the successfully resolved address. + * @throws IllegalStateException if the address is not resolved + * in maxResolveAddressAttempts tries. + */ + private InetSocketAddress resolveAddress(InetSocketAddress address) { + int resolveAttempts = 0; + while (address.isUnresolved() && + resolveAttempts < maxResolveAddressAttempts) { + ++resolveAttempts; + LOG.warn("resolveAddress: Failed to resolve " + address + + " on attempt " + resolveAttempts + " of " + + maxResolveAddressAttempts + " attempts, sleeping for 5 seconds"); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + LOG.warn("resolveAddress: Interrupted.", e); + } + } + if (resolveAttempts >= maxResolveAddressAttempts) { + throw new IllegalStateException("resolveAddress: Couldn't " + + "resolve " + address + " in " + resolveAttempts + " tries."); + } + return address; + } + + /** * When doing the request, short circuit if it is local * * @param workerInfo Worker info @@ -218,56 +233,55 @@ public class NettyWorkerClient= maxMessagesPerPartition) { - InetSocketAddress remoteServerAddress = - getInetSocketAddress(partitionOwner.getWorkerInfo(), partitionId); - Map> partitionMessages = - sendMessageCache.removePartitionMessages(partitionId); + // Send a request if the cache of outgoing message to + // the remote worker 'workerInfo' is full enough to be flushed + if (workerMessageCount >= maxMessagesPerWorker) { + Map>> workerMessages = + sendMessageCache.removeWorkerMessages(workerInfo); + InetSocketAddress remoteWorkerAddress = + getInetSocketAddress(workerInfo, partitionId); WritableRequest writableRequest = - new SendPartitionMessagesRequest( - partitionId, partitionMessages); - doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress, - writableRequest); + new SendWorkerMessagesRequest(workerMessages); + doRequest(workerInfo, remoteWorkerAddress, writableRequest); } } @Override public void sendPartitionRequest(WorkerInfo workerInfo, Partition partition) { + final int partitionId = partition.getId(); InetSocketAddress remoteServerAddress = - getInetSocketAddress(workerInfo, partition.getId()); + getInetSocketAddress(workerInfo, partitionId); if (LOG.isTraceEnabled()) { LOG.trace("sendPartitionRequest: Sending to " + remoteServerAddress + " from " + workerInfo + ", with partition " + partition); } - int partitionId = partition.getId(); WritableRequest vertexRequest = new SendVertexRequest(partitionId, partition.getVertices()); @@ -288,7 +302,7 @@ public class NettyWorkerClient maxMessagesPerPartition) { + if (messagesInMap > maxMessagesPerWorker) { WritableRequest messagesRequest = new SendPartitionCurrentMessagesRequest(partitionId, map); doRequest(workerInfo, remoteServerAddress, messagesRequest); @@ -407,21 +421,19 @@ public class NettyWorkerClient>> remainingMessageCache = - sendMessageCache.removeAllPartitionMessages(); - for (Entry>> entry : - remainingMessageCache.entrySet()) { + Map>>> remainingMessageCache = + sendMessageCache.removeAllMessages(); + for (Entry>>> entry : + remainingMessageCache.entrySet()) { + Iterator cachedPartitionId = + entry.getValue().keySet().iterator(); + final int partitionId = cachedPartitionId.hasNext() ? + cachedPartitionId.next() : NO_PARTITION_ID; + InetSocketAddress remoteWorkerAddress = + getInetSocketAddress(entry.getKey(), partitionId); WritableRequest writableRequest = - new SendPartitionMessagesRequest( - entry.getKey(), entry.getValue()); - PartitionOwner partitionOwner = - service.getVertexPartitionOwner( - entry.getValue().keySet().iterator().next()); - InetSocketAddress remoteServerAddress = - getInetSocketAddress(partitionOwner.getWorkerInfo(), - partitionOwner.getPartitionId()); - doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress, - writableRequest); + new SendWorkerMessagesRequest(entry.getValue()); + doRequest(entry.getKey(), remoteWorkerAddress, writableRequest); } // Execute the remaining sends mutations (if any) Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java?rev=1393266&r1=1393265&r2=1393266&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java Wed Oct 3 03:05:01 2012 @@ -25,7 +25,7 @@ public enum RequestType { /** Sending vertices request */ SEND_VERTEX_REQUEST(SendVertexRequest.class), /** Sending a partition of messages for next superstep */ - SEND_PARTITION_MESSAGES_REQUEST(SendPartitionMessagesRequest.class), + SEND_WORKER_MESSAGES_REQUEST(SendWorkerMessagesRequest.class), /** * Sending a partition of messages for current superstep * (used during partition exchange) Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java?rev=1393266&r1=1393265&r2=1393266&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java Wed Oct 3 03:05:01 2012 @@ -18,13 +18,19 @@ package org.apache.giraph.comm.requests; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.apache.giraph.comm.ServerData; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.Map.Entry; /** * Send a collection of vertex messages for a partition. It adds messages to @@ -36,8 +42,13 @@ import java.util.Map; * @param Message data */ public class SendPartitionCurrentMessagesRequest extends - SendPartitionMessagesRequest { + V extends Writable, E extends Writable, M extends Writable> extends + WritableRequest implements WorkerRequest { + /** the destination partition for these vertices' messages*/ + private int partitionId; + /** map of destination vertex ID's to message lists */ + private Map> vertexMessageMap; + /** Constructor used for reflection only */ public SendPartitionCurrentMessagesRequest() { } @@ -48,8 +59,10 @@ public class SendPartitionCurrentMessage * @param vertexIdMessages Map of messages to send */ public SendPartitionCurrentMessagesRequest(int partitionId, - Map> vertexIdMessages) { - super(partitionId, vertexIdMessages); + Map> vertexIdMessages) { + super(); + this.partitionId = partitionId; + this.vertexMessageMap = vertexIdMessages; } @Override @@ -58,10 +71,44 @@ public class SendPartitionCurrentMessage } @Override + public void readFieldsRequest(DataInput input) throws IOException { + partitionId = input.readInt(); + final int numVertices = input.readInt(); + vertexMessageMap = + Maps.>newHashMapWithExpectedSize(numVertices); + for (int i = 0; i < numVertices; ++i) { + I nextVertex = getConf().createVertexId(); + nextVertex.readFields(input); + final int numMessages = input.readInt(); + Collection messagesForVertex = + Lists.newArrayListWithExpectedSize(numMessages); + vertexMessageMap.put(nextVertex, messagesForVertex); + for (int j = 0; j < numMessages; ++j) { + M nextMessage = getConf().createMessageValue(); + nextMessage.readFields(input); + messagesForVertex.add(nextMessage); + } + } + } + + @Override + public void writeRequest(DataOutput output) throws IOException { + output.writeInt(partitionId); + output.writeInt(vertexMessageMap.size()); + for (Entry> entry : vertexMessageMap.entrySet()) { + entry.getKey().write(output); + output.writeInt(entry.getValue().size()); + for (M message : entry.getValue()) { + message.write(output); + } + } + } + + @Override public void doRequest(ServerData serverData) { try { serverData.getCurrentMessageStore().addPartitionMessages( - getVertexIdMessages(), getPartitionId()); + vertexMessageMap, partitionId); } catch (IOException e) { throw new RuntimeException("doRequest: Got IOException ", e); } Added: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java?rev=1393266&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java Wed Oct 3 03:05:01 2012 @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.requests; + +import org.apache.giraph.comm.ServerData; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * Send a collection of vertex messages for a partition. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + */ +@SuppressWarnings("rawtypes") +public class SendWorkerMessagesRequest extends + WritableRequest implements WorkerRequest { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(SendWorkerMessagesRequest.class); + /** + * All messages for a group of vertices, organized by partition, which + * are owned by a single (destination) worker. These messages are all + * destined for this worker. + * */ + private Map>> partitionVertexMessagesMap; + + /** + * Constructor used for reflection only + */ + public SendWorkerMessagesRequest() { } + + /** + * Constructor used to send request. + * + * @param partVertMsgsMap Map of remote partitions => vertices => messages + */ + public SendWorkerMessagesRequest( + Map>> partVertMsgsMap) { + super(); + this.partitionVertexMessagesMap = partVertMsgsMap; + } + + @Override + public void readFieldsRequest(DataInput input) throws IOException { + int numPartitions = input.readInt(); + partitionVertexMessagesMap = Maps.>> + newHashMapWithExpectedSize(numPartitions); + while (numPartitions-- > 0) { + final int partitionId = input.readInt(); + int numVertices = input.readInt(); + Map> vertexIdMessages = + Maps.>newHashMapWithExpectedSize(numVertices); + partitionVertexMessagesMap.put(partitionId, vertexIdMessages); + while (numVertices-- > 0) { + I vertexId = getConf().createVertexId(); + vertexId.readFields(input); + int messageCount = input.readInt(); + List messageList = + Lists.newArrayListWithExpectedSize(messageCount); + while (messageCount-- > 0) { + M message = getConf().createMessageValue(); + message.readFields(input); + messageList.add(message); + } + if (vertexIdMessages.put(vertexId, messageList) != null) { + throw new IllegalStateException( + "readFields: Already has vertex id " + vertexId); + } + } + } + } + + @Override + public void writeRequest(DataOutput output) throws IOException { + output.writeInt(partitionVertexMessagesMap.size()); + for (Entry>> partitionEntry : + partitionVertexMessagesMap.entrySet()) { + output.writeInt(partitionEntry.getKey()); + output.writeInt(partitionEntry.getValue().size()); + for (Entry> vertexEntry : + partitionEntry.getValue().entrySet()) { + vertexEntry.getKey().write(output); + output.writeInt(vertexEntry.getValue().size()); + for (M message : vertexEntry.getValue()) { + message.write(output); + } + } + } + } + + @Override + public RequestType getType() { + return RequestType.SEND_WORKER_MESSAGES_REQUEST; + } + + @Override + public void doRequest(ServerData serverData) { + for (Entry>> entry : + partitionVertexMessagesMap.entrySet()) { + try { + serverData.getIncomingMessageStore() + .addPartitionMessages(entry.getValue(), entry.getKey()); + } catch (IOException e) { + throw new RuntimeException("doRequest: Got IOException ", e); + } + } + } +} Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1393266&r1=1393265&r2=1393266&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Oct 3 03:05:01 2012 @@ -445,10 +445,10 @@ public class BspServiceMaster partitionSet = new TreeSet(); for (WorkerInfo workerInfo : healthyWorkerInfoList) { - partitionSet.add(workerInfo.getPartitionId()); + partitionSet.add(workerInfo.getTaskId()); } for (WorkerInfo workerInfo : unhealthyWorkerInfoList) { - partitionSet.add(workerInfo.getPartitionId()); + partitionSet.add(workerInfo.getTaskId()); } for (int i = 1; i <= maxWorkers; ++i) { if (partitionSet.contains(Integer.valueOf(i))) { Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1393266&r1=1393265&r2=1393266&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java Wed Oct 3 03:05:01 2012 @@ -31,8 +31,8 @@ import org.apache.hadoop.io.Writable; public class WorkerInfo implements Writable { /** Worker hostname */ private String hostname; - /** Partition id of this worker */ - private int partitionId = -1; + /** Task Partition (Worker) ID of this worker */ + private int taskId = -1; /** Port that the RPC server is using */ private int port = -1; /** Hostname + "_" + id for easier debugging */ @@ -45,25 +45,25 @@ public class WorkerInfo implements Writa } /** - * Constructor with paramters. + * Constructor with parameters. * * @param hostname Hostname of this worker. - * @param partitionId partition id of this particular object. + * @param taskId the task partition for this worker * @param port Port of the service. */ - public WorkerInfo(String hostname, int partitionId, int port) { + public WorkerInfo(String hostname, int taskId, int port) { this.hostname = hostname; - this.partitionId = partitionId; + this.taskId = taskId; this.port = port; - this.hostnameId = hostname + "_" + partitionId; + this.hostnameId = hostname + "_" + taskId; } public String getHostname() { return hostname; } - public int getPartitionId() { - return partitionId; + public int getTaskId() { + return taskId; } public String getHostnameId() { @@ -88,7 +88,7 @@ public class WorkerInfo implements Writa if (other instanceof WorkerInfo) { WorkerInfo workerInfo = (WorkerInfo) other; if (hostname.equals(workerInfo.getHostname()) && - (partitionId == workerInfo.getPartitionId()) && + (taskId == workerInfo.getTaskId()) && (port == workerInfo.getPort())) { return true; } @@ -101,28 +101,28 @@ public class WorkerInfo implements Writa int result = 17; result = 37 * result + port; result = 37 * result + hostname.hashCode(); - result = 37 * result + partitionId; + result = 37 * result + taskId; return result; } @Override public String toString() { - return "Worker(hostname=" + hostname + ", MRpartition=" + - partitionId + ", port=" + port + ")"; + return "Worker(hostname=" + hostname + ", MRtaskID=" + + taskId + ", port=" + port + ")"; } @Override public void readFields(DataInput input) throws IOException { hostname = input.readUTF(); - partitionId = input.readInt(); + taskId = input.readInt(); port = input.readInt(); - hostnameId = hostname + "_" + partitionId; + hostnameId = hostname + "_" + taskId; } @Override public void write(DataOutput output) throws IOException { output.writeUTF(hostname); - output.writeInt(partitionId); + output.writeInt(taskId); output.writeInt(port); } } Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1393266&r1=1393265&r2=1393266&view=diff ============================================================================== --- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original) +++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Wed Oct 3 03:05:01 2012 @@ -24,7 +24,7 @@ import org.apache.giraph.comm.messages.S import org.apache.giraph.comm.netty.NettyClient; import org.apache.giraph.comm.netty.NettyServer; import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler; -import org.apache.giraph.comm.requests.SendPartitionMessagesRequest; +import org.apache.giraph.comm.requests.SendWorkerMessagesRequest; import org.apache.giraph.comm.requests.WritableRequest; import org.apache.giraph.graph.EdgeListVertex; import org.apache.giraph.utils.MockUtils; @@ -84,9 +84,12 @@ public class RequestFailureTest { private WritableRequest getRequest() { // Data to send - int partitionId = 0; + final int partitionId = 0; + Map>> sendMap = + Maps.newHashMap(); Map> vertexIdMessages = Maps.newHashMap(); + sendMap.put(partitionId, vertexIdMessages); for (int i = 1; i < 7; ++i) { IntWritable vertexId = new IntWritable(i); Collection messages = Lists.newArrayList(); @@ -97,10 +100,10 @@ public class RequestFailureTest { } // Send the request - SendPartitionMessagesRequest request = - new SendPartitionMessagesRequest(partitionId, vertexIdMessages); + SendWorkerMessagesRequest request = + new SendWorkerMessagesRequest(sendMap); return request; } Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1393266&r1=1393265&r2=1393266&view=diff ============================================================================== --- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original) +++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Wed Oct 3 03:05:01 2012 @@ -24,7 +24,7 @@ import org.apache.giraph.comm.messages.S import org.apache.giraph.comm.netty.NettyClient; import org.apache.giraph.comm.netty.NettyServer; import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler; -import org.apache.giraph.comm.requests.SendPartitionMessagesRequest; +import org.apache.giraph.comm.requests.SendWorkerMessagesRequest; import org.apache.giraph.comm.requests.SendPartitionMutationsRequest; import org.apache.giraph.comm.requests.SendVertexRequest; import org.apache.giraph.graph.Edge; @@ -145,11 +145,14 @@ public class RequestTest { } @Test - public void sendPartitionMessagesRequest() throws IOException { + public void sendWorkerMessagesRequest() throws IOException { // Data to send + Map>> sendMap = + Maps.newHashMap(); int partitionId = 0; Map> vertexIdMessages = Maps.newHashMap(); + sendMap.put(partitionId, vertexIdMessages); for (int i = 1; i < 7; ++i) { IntWritable vertexId = new IntWritable(i); Collection messages = Lists.newArrayList(); @@ -160,10 +163,10 @@ public class RequestTest { } // Send the request - SendPartitionMessagesRequest request = - new SendPartitionMessagesRequest(partitionId, vertexIdMessages); + SendWorkerMessagesRequest request = + new SendWorkerMessagesRequest(sendMap); client.sendWritableRequest(-1, server.getMyAddress(), request); client.waitAllRequests();