Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0B10FD9C0 for ; Thu, 20 Dec 2012 04:25:50 +0000 (UTC) Received: (qmail 56136 invoked by uid 500); 20 Dec 2012 04:25:49 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 56066 invoked by uid 500); 20 Dec 2012 04:25:49 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 54114 invoked by uid 99); 20 Dec 2012 04:25:32 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Dec 2012 04:25:32 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E2E943247CC; Thu, 20 Dec 2012 04:25:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nitay@apache.org To: commits@giraph.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [19/51] [partial] GIRAPH-457: update module names (nitay) Message-Id: <20121220042531.E2E943247CC@tyr.zones.apache.org> Date: Thu, 20 Dec 2012 04:25:31 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java new file mode 100644 index 0000000..5460a55 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.graph.partition; + +import com.google.common.collect.MapMaker; + +import com.google.common.primitives.Ints; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.UnsafeByteArrayInputStream; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.Progressable; +import org.apache.log4j.Logger; + +/** + * Byte array based partition. Should reduce the amount of memory used since + * the entire graph is compressed into byte arrays. Must guarantee, however, + * that only one thread at a time will call getVertex since it is a singleton. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message data + */ +public class ByteArrayPartition + implements Partition { + /** Class logger */ + private static final Logger LOG = Logger.getLogger(ByteArrayPartition.class); + /** Configuration from the worker */ + private ImmutableClassesGiraphConfiguration conf; + /** Partition id */ + private int id; + /** + * Vertex map for this range (keyed by index). Note that the byte[] is a + * serialized vertex with the first four bytes as the length of the vertex + * to read. + */ + private ConcurrentMap vertexMap; + /** Context used to report progress */ + private Progressable progressable; + /** Representative vertex */ + private Vertex representativeVertex; + /** Use unsafe serialization */ + private boolean useUnsafeSerialization; + + /** + * Constructor for reflection. + */ + public ByteArrayPartition() { } + + @Override + public void initialize(int partitionId, Progressable progressable) { + setId(partitionId); + setProgressable(progressable); + vertexMap = new MapMaker().concurrencyLevel( + conf.getNettyServerExecutionConcurrency()).makeMap(); + representativeVertex = conf.createVertex(); + useUnsafeSerialization = conf.useUnsafeSerialization(); + } + + @Override + public Vertex getVertex(I vertexIndex) { + byte[] vertexData = vertexMap.get(vertexIndex); + if (vertexData == null) { + return null; + } + WritableUtils.readFieldsFromByteArrayWithSize( + vertexData, representativeVertex, useUnsafeSerialization); + return representativeVertex; + } + + @Override + public Vertex putVertex(Vertex vertex) { + byte[] vertexData = + WritableUtils.writeToByteArrayWithSize(vertex, useUnsafeSerialization); + byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData); + if (oldVertexBytes == null) { + return null; + } else { + WritableUtils.readFieldsFromByteArrayWithSize( + oldVertexBytes, representativeVertex, useUnsafeSerialization); + return representativeVertex; + } + } + + @Override + public Vertex removeVertex(I vertexIndex) { + byte[] vertexBytes = vertexMap.remove(vertexIndex); + if (vertexBytes == null) { + return null; + } + WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes, + representativeVertex, useUnsafeSerialization); + return representativeVertex; + } + + @Override + public void addPartition(Partition partition) { + // Only work with other ByteArrayPartition instances + if (!(partition instanceof ByteArrayPartition)) { + throw new IllegalStateException("addPartition: Cannot add partition " + + "of type " + partition.getClass()); + } + + ByteArrayPartition byteArrayPartition = + (ByteArrayPartition) partition; + for (Map.Entry entry : + byteArrayPartition.vertexMap.entrySet()) { + vertexMap.put(entry.getKey(), entry.getValue()); + } + } + + @Override + public long getVertexCount() { + return vertexMap.size(); + } + + @Override + public long getEdgeCount() { + long edges = 0; + for (byte[] vertexBytes : vertexMap.values()) { + WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes, + representativeVertex, useUnsafeSerialization); + edges += representativeVertex.getNumEdges(); + } + return edges; + } + + @Override + public int getId() { + return id; + } + + @Override + public void setId(int id) { + this.id = id; + } + + @Override + public void setProgressable(Progressable progressable) { + this.progressable = progressable; + } + + @Override + public void saveVertex(Vertex vertex) { + // Reuse the old buffer whenever possible + byte[] oldVertexData = vertexMap.get(vertex.getId()); + if (oldVertexData != null) { + vertexMap.put(vertex.getId(), + WritableUtils.writeToByteArrayWithSize( + vertex, oldVertexData, useUnsafeSerialization)); + } else { + vertexMap.put(vertex.getId(), + WritableUtils.writeToByteArrayWithSize( + vertex, useUnsafeSerialization)); + } + } + + @Override + public void write(DataOutput output) throws IOException { + output.writeInt(id); + output.writeInt(vertexMap.size()); + for (Map.Entry entry : vertexMap.entrySet()) { + if (progressable != null) { + progressable.progress(); + } + entry.getKey().write(output); + // Note here that we are writing the size of the vertex data first + // as it is encoded in the first four bytes of the byte[] + int vertexDataSize; + if (useUnsafeSerialization) { + vertexDataSize = UnsafeByteArrayInputStream.getInt(entry.getValue(), + 0); + } else { + vertexDataSize = Ints.fromByteArray(entry.getValue()); + } + + output.writeInt(vertexDataSize); + output.write(entry.getValue(), 0, vertexDataSize); + } + } + + @Override + public void readFields(DataInput input) throws IOException { + id = input.readInt(); + int size = input.readInt(); + vertexMap = new MapMaker().concurrencyLevel( + conf.getNettyServerExecutionConcurrency()).initialCapacity( + size).makeMap(); + representativeVertex = conf.createVertex(); + useUnsafeSerialization = conf.useUnsafeSerialization(); + for (int i = 0; i < size; ++i) { + if (progressable != null) { + progressable.progress(); + } + I vertexId = conf.createVertexId(); + vertexId.readFields(input); + int vertexDataSize = input.readInt(); + byte[] vertexData = new byte[vertexDataSize]; + input.readFully(vertexData); + if (vertexMap.put(vertexId, vertexData) != null) { + throw new IllegalStateException("readFields: Already saw vertex " + + vertexId); + } + } + } + + @Override + public void setConf( + ImmutableClassesGiraphConfiguration configuration) { + conf = configuration; + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return conf; + } + + @Override + public Iterator> iterator() { + return new RepresentativeVertexIterator(); + } + + /** + * Iterator that deserializes a vertex from a byte array on the fly, using + * the same representative vertex object. + */ + private class RepresentativeVertexIterator implements + Iterator> { + /** Iterator to the vertex values */ + private Iterator vertexDataIterator = + vertexMap.values().iterator(); + + @Override + public boolean hasNext() { + return vertexDataIterator.hasNext(); + } + + @Override + public Vertex next() { + WritableUtils.readFieldsFromByteArrayWithSize( + vertexDataIterator.next(), representativeVertex, + useUnsafeSerialization); + return representativeVertex; + } + + @Override + public void remove() { + throw new IllegalAccessError("remove: This method is not supported."); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java new file mode 100644 index 0000000..1e483d3 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.log4j.Logger; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A partition store that can possibly spill to disk. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + */ +public class DiskBackedPartitionStore + extends PartitionStore { + /** Class logger. */ + private static final Logger LOG = + Logger.getLogger(DiskBackedPartitionStore.class); + /** Map of partitions kept in memory. */ + private final ConcurrentMap> + inMemoryPartitions = new ConcurrentHashMap>(); + /** Maximum number of partitions to keep in memory. */ + private int maxInMemoryPartitions; + /** Map of partitions kept out-of-core. The values are partition sizes. */ + private final ConcurrentMap onDiskPartitions = + Maps.newConcurrentMap(); + /** Directory on the local file system for storing out-of-core partitions. */ + private final String basePath; + /** Configuration. */ + private final ImmutableClassesGiraphConfiguration conf; + /** Slot for loading out-of-core partitions. */ + private Partition loadedPartition; + /** Locks for accessing and modifying partitions. */ + private final ConcurrentMap partitionLocks = + Maps.newConcurrentMap(); + /** Context used to report progress */ + private final Mapper.Context context; + + /** + * Constructor. + * + * @param conf Configuration + * @param context Mapper context + */ + public DiskBackedPartitionStore( + ImmutableClassesGiraphConfiguration conf, + Mapper.Context context) { + this.conf = conf; + this.context = context; + // We must be able to hold at least one partition in memory + maxInMemoryPartitions = Math.max(1, + conf.getInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY, + GiraphConstants.MAX_PARTITIONS_IN_MEMORY_DEFAULT)); + basePath = conf.get("mapred.job.id", "Unknown Job") + + conf.get(GiraphConstants.PARTITIONS_DIRECTORY, + GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT); + } + + /** + * Get the path to the file where a partition is stored. + * + * @param partitionId The partition + * @return The path to the given partition + */ + private String getPartitionPath(Integer partitionId) { + return basePath + "/partition-" + partitionId; + } + + /** + * Create a new lock for a partition, lock it, and return it. If already + * existing, return null. + * + * @param partitionId Partition id + * @return A newly created lock, or null if already present + */ + private Lock createLock(Integer partitionId) { + Lock lock = new ReentrantLock(true); + lock.lock(); + if (partitionLocks.putIfAbsent(partitionId, lock) != null) { + return null; + } + return lock; + } + + /** + * Get the lock for a partition id. + * + * @param partitionId Partition id + * @return The lock + */ + private Lock getLock(Integer partitionId) { + return partitionLocks.get(partitionId); + } + + /** + * Write a partition to disk. + * + * @param partition The partition object to write + * @throws java.io.IOException + */ + private void writePartition(Partition partition) + throws IOException { + File file = new File(getPartitionPath(partition.getId())); + file.getParentFile().mkdirs(); + file.createNewFile(); + DataOutputStream outputStream = new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(file))); + for (Vertex vertex : partition) { + vertex.write(outputStream); + } + outputStream.close(); + } + + /** + * Read a partition from disk. + * + * @param partitionId Id of the partition to read + * @return The partition object + * @throws IOException + */ + private Partition readPartition(Integer partitionId) + throws IOException { + Partition partition = + conf.createPartition(partitionId, context); + File file = new File(getPartitionPath(partitionId)); + DataInputStream inputStream = new DataInputStream( + new BufferedInputStream(new FileInputStream(file))); + int numVertices = onDiskPartitions.get(partitionId); + for (int i = 0; i < numVertices; ++i) { + Vertex vertex = conf.createVertex(); + vertex.readFields(inputStream); + partition.putVertex(vertex); + } + inputStream.close(); + file.delete(); + return partition; + } + + /** + * Append some vertices of another partition to an out-of-core partition. + * + * @param partition Partition to add + * @throws IOException + */ + private void appendPartitionOutOfCore(Partition partition) + throws IOException { + File file = new File(getPartitionPath(partition.getId())); + DataOutputStream outputStream = new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(file, true))); + for (Vertex vertex : partition) { + vertex.write(outputStream); + } + outputStream.close(); + } + + /** + * Load an out-of-core partition in memory. + * + * @param partitionId Partition id + */ + private void loadPartition(Integer partitionId) { + if (loadedPartition != null) { + if (loadedPartition.getId() == partitionId) { + return; + } + if (LOG.isInfoEnabled()) { + LOG.info("loadPartition: moving partition " + loadedPartition.getId() + + " out of core with size " + loadedPartition.getVertexCount()); + } + try { + writePartition(loadedPartition); + onDiskPartitions.put(loadedPartition.getId(), + (int) loadedPartition.getVertexCount()); + loadedPartition = null; + } catch (IOException e) { + throw new IllegalStateException("loadPartition: failed writing " + + "partition " + loadedPartition.getId() + " to disk", e); + } + } + if (LOG.isInfoEnabled()) { + LOG.info("loadPartition: loading partition " + partitionId + + " in memory"); + } + try { + loadedPartition = readPartition(partitionId); + } catch (IOException e) { + throw new IllegalStateException("loadPartition: failed reading " + + "partition " + partitionId + " from disk"); + } + } + + /** + * Add a new partition without requiring a lock. + * + * @param partition Partition to be added + */ + private void addPartitionNoLock(Partition partition) { + synchronized (inMemoryPartitions) { + if (inMemoryPartitions.size() + 1 < maxInMemoryPartitions) { + inMemoryPartitions.put(partition.getId(), partition); + + return; + } + } + try { + writePartition(partition); + onDiskPartitions.put(partition.getId(), + (int) partition.getVertexCount()); + } catch (IOException e) { + throw new IllegalStateException("addPartition: failed writing " + + "partition " + partition.getId() + "to disk"); + } + } + + @Override + public void addPartition(Partition partition) { + if (inMemoryPartitions.containsKey(partition.getId())) { + Partition existingPartition = + inMemoryPartitions.get(partition.getId()); + existingPartition.addPartition(partition); + } else if (onDiskPartitions.containsKey(partition.getId())) { + Lock lock = getLock(partition.getId()); + lock.lock(); + if (loadedPartition != null && loadedPartition.getId() == + partition.getId()) { + loadedPartition.addPartition(partition); + } else { + try { + appendPartitionOutOfCore(partition); + onDiskPartitions.put(partition.getId(), + onDiskPartitions.get(partition.getId()) + + (int) partition.getVertexCount()); + } catch (IOException e) { + throw new IllegalStateException("addPartition: failed " + + "writing vertices to partition " + partition.getId() + " on disk", + e); + } + } + lock.unlock(); + } else { + Lock lock = createLock(partition.getId()); + if (lock != null) { + addPartitionNoLock(partition); + lock.unlock(); + } else { + // Another thread is already creating the partition, + // so we make sure it's done before repeating the call. + lock = getLock(partition.getId()); + lock.lock(); + lock.unlock(); + addPartition(partition); + } + } + } + + @Override + public Partition getPartition(Integer partitionId) { + if (inMemoryPartitions.containsKey(partitionId)) { + return inMemoryPartitions.get(partitionId); + } else if (onDiskPartitions.containsKey(partitionId)) { + loadPartition(partitionId); + return loadedPartition; + } else { + throw new IllegalStateException("getPartition: partition " + + partitionId + " does not exist"); + } + } + + @Override + public Partition removePartition(Integer partitionId) { + partitionLocks.remove(partitionId); + if (onDiskPartitions.containsKey(partitionId)) { + Partition partition; + if (loadedPartition != null && loadedPartition.getId() == partitionId) { + partition = loadedPartition; + loadedPartition = null; + } else { + try { + partition = readPartition(partitionId); + } catch (IOException e) { + throw new IllegalStateException("removePartition: failed reading " + + "partition " + partitionId + " from disk", e); + } + } + onDiskPartitions.remove(partitionId); + return partition; + } else { + return inMemoryPartitions.remove(partitionId); + } + } + + @Override + public void deletePartition(Integer partitionId) { + partitionLocks.remove(partitionId); + if (inMemoryPartitions.containsKey(partitionId)) { + inMemoryPartitions.remove(partitionId); + } else { + if (loadedPartition != null && loadedPartition.getId() == partitionId) { + loadedPartition = null; + } else { + File file = new File(getPartitionPath(partitionId)); + file.delete(); + } + onDiskPartitions.remove(partitionId); + } + } + + @Override + public boolean hasPartition(Integer partitionId) { + return partitionLocks.containsKey(partitionId); + } + + @Override + public Iterable getPartitionIds() { + return Iterables.concat(inMemoryPartitions.keySet(), + onDiskPartitions.keySet()); + } + + @Override + public int getNumPartitions() { + return partitionLocks.size(); + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java new file mode 100644 index 0000000..a7ac84b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Defines the partitioning framework for this application. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message value + */ +@SuppressWarnings("rawtypes") +public interface GraphPartitionerFactory extends + ImmutableClassesGiraphConfigurable { + /** + * Create the {@link MasterGraphPartitioner} used by the master. + * Instantiated once by the master and reused. + * + * @return Instantiated master graph partitioner + */ + MasterGraphPartitioner createMasterGraphPartitioner(); + + /** + * Create the {@link WorkerGraphPartitioner} used by the worker. + * Instantiated once by every worker and reused. + * + * @return Instantiated worker graph partitioner + */ + WorkerGraphPartitioner createWorkerGraphPartitioner(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java new file mode 100644 index 0000000..6724e50 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.WorkerInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +/** + * Master will execute a hash based partitioning. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message value + */ +@SuppressWarnings("rawtypes") +public class HashMasterPartitioner implements + MasterGraphPartitioner { + /** Multiplier for the current workers squared */ + public static final String PARTITION_COUNT_MULTIPLIER = + "hash.masterPartitionCountMultipler"; + /** Default mulitplier for current workers squared */ + public static final float DEFAULT_PARTITION_COUNT_MULTIPLIER = 1.0f; + /** Overrides default partition count calculation if not -1 */ + public static final String USER_PARTITION_COUNT = + "hash.userPartitionCount"; + /** Default user partition count */ + public static final int DEFAULT_USER_PARTITION_COUNT = -1; + /** Class logger */ + private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class); + /** + * ZooKeeper has a limit of the data in a single znode of 1 MB and + * each entry can go be on the average somewhat more than 300 bytes + */ + private static final int MAX_PARTTIONS = 1024 * 1024 / 350; + /** Provided configuration */ + private ImmutableClassesGiraphConfiguration conf; + /** Specified partition count (overrides calculation) */ + private final int userPartitionCount; + /** Partition count (calculated in createInitialPartitionOwners) */ + private int partitionCount = -1; + /** Save the last generated partition owner list */ + private List partitionOwnerList; + + /** + * Constructor. + * + *@param conf Configuration used. + */ + public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) { + this.conf = conf; + userPartitionCount = conf.getInt(USER_PARTITION_COUNT, + DEFAULT_USER_PARTITION_COUNT); + } + + @Override + public Collection createInitialPartitionOwners( + Collection availableWorkerInfos, int maxWorkers) { + if (availableWorkerInfos.isEmpty()) { + throw new IllegalArgumentException( + "createInitialPartitionOwners: No available workers"); + } + List ownerList = new ArrayList(); + Iterator workerIt = availableWorkerInfos.iterator(); + if (userPartitionCount == DEFAULT_USER_PARTITION_COUNT) { + float multiplier = conf.getFloat( + PARTITION_COUNT_MULTIPLIER, + DEFAULT_PARTITION_COUNT_MULTIPLIER); + partitionCount = + Math.max((int) (multiplier * availableWorkerInfos.size() * + availableWorkerInfos.size()), + 1); + } else { + partitionCount = userPartitionCount; + } + if (LOG.isInfoEnabled()) { + LOG.info("createInitialPartitionOwners: Creating " + + partitionCount + ", default would have been " + + (availableWorkerInfos.size() * + availableWorkerInfos.size()) + " partitions."); + } + if (partitionCount > MAX_PARTTIONS) { + LOG.warn("createInitialPartitionOwners: " + + "Reducing the partitionCount to " + MAX_PARTTIONS + + " from " + partitionCount); + partitionCount = MAX_PARTTIONS; + } + + for (int i = 0; i < partitionCount; ++i) { + PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next()); + if (!workerIt.hasNext()) { + workerIt = availableWorkerInfos.iterator(); + } + ownerList.add(owner); + } + this.partitionOwnerList = ownerList; + return ownerList; + } + + @Override + public Collection getCurrentPartitionOwners() { + return partitionOwnerList; + } + + /** + * Subclasses can set the partition owner list. + * + * @param partitionOwnerList New partition owner list. + */ + protected void setPartitionOwnerList(List + partitionOwnerList) { + this.partitionOwnerList = partitionOwnerList; + } + + @Override + public Collection generateChangedPartitionOwners( + Collection allPartitionStatsList, + Collection availableWorkerInfos, + int maxWorkers, + long superstep) { + return PartitionBalancer.balancePartitionsAcrossWorkers( + conf, + partitionOwnerList, + allPartitionStatsList, + availableWorkerInfos); + } + + @Override + public PartitionStats createPartitionStats() { + return new PartitionStats(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java new file mode 100644 index 0000000..69e7a5e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Divides the vertices into partitions by their hash code using a simple + * round-robin hash for great balancing if given a random hash code. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message value + */ +@SuppressWarnings("rawtypes") +public class HashPartitionerFactory + implements GraphPartitionerFactory { + /** Saved configuration */ + private ImmutableClassesGiraphConfiguration conf; + + @Override + public MasterGraphPartitioner createMasterGraphPartitioner() { + return new HashMasterPartitioner(getConf()); + } + + @Override + public WorkerGraphPartitioner createWorkerGraphPartitioner() { + return new HashWorkerPartitioner(); + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return conf; + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration conf) { + this.conf = conf; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java new file mode 100644 index 0000000..df6457b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Divides the vertices into partitions by their hash code using ranges of the + * hash space. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message value + */ +@SuppressWarnings("rawtypes") +public class HashRangePartitionerFactory + implements GraphPartitionerFactory { + /** Saved configuration */ + private ImmutableClassesGiraphConfiguration conf; + + @Override + public MasterGraphPartitioner createMasterGraphPartitioner() { + return new HashMasterPartitioner(getConf()); + } + + @Override + public WorkerGraphPartitioner createWorkerGraphPartitioner() { + return new HashRangeWorkerPartitioner(); + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return conf; + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration conf) { + this.conf = conf; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java new file mode 100644 index 0000000..ea2cf66 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.primitives.UnsignedInts; + +/** + * Implements range-based partitioning from the id hash code. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message value + */ +@SuppressWarnings("rawtypes") +public class HashRangeWorkerPartitioner + extends HashWorkerPartitioner { + /** A transformed hashCode() must be strictly smaller than this. */ + private static final long HASH_LIMIT = 2L * Integer.MAX_VALUE + 2L; + + @Override + public PartitionOwner getPartitionOwner(I vertexId) { + long unsignedHashCode = UnsignedInts.toLong(vertexId.hashCode()); + // The reader can verify that unsignedHashCode of HASH_LIMIT - 1 yields + // index of size - 1, and unsignedHashCode of 0 yields index of 0. + int index = (int) + ((unsignedHashCode * getPartitionOwners().size()) / HASH_LIMIT); + return partitionOwnerList.get(index); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java new file mode 100644 index 0000000..a76f803 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import com.google.common.collect.Lists; +import org.apache.giraph.graph.WorkerInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Implements hash-based partitioning from the id hash code. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message value + */ +@SuppressWarnings("rawtypes") +public class HashWorkerPartitioner + implements WorkerGraphPartitioner { + /** + * Mapping of the vertex ids to {@link PartitionOwner}. + */ + protected List partitionOwnerList = + Lists.newArrayList(); + + @Override + public PartitionOwner createPartitionOwner() { + return new BasicPartitionOwner(); + } + + @Override + public PartitionOwner getPartitionOwner(I vertexId) { + return partitionOwnerList.get( + Math.abs(vertexId.hashCode() % partitionOwnerList.size())); + } + + @Override + public Collection finalizePartitionStats( + Collection workerPartitionStats, + PartitionStore partitionStore) { + // No modification necessary + return workerPartitionStats; + } + + @Override + public PartitionExchange updatePartitionOwners( + WorkerInfo myWorkerInfo, + Collection masterSetPartitionOwners, + PartitionStore partitionStore) { + partitionOwnerList.clear(); + partitionOwnerList.addAll(masterSetPartitionOwners); + + Set dependentWorkerSet = new HashSet(); + Map> workerPartitionOwnerMap = + new HashMap>(); + for (PartitionOwner partitionOwner : masterSetPartitionOwners) { + if (partitionOwner.getPreviousWorkerInfo() == null) { + continue; + } else if (partitionOwner.getWorkerInfo().equals( + myWorkerInfo) && + partitionOwner.getPreviousWorkerInfo().equals( + myWorkerInfo)) { + throw new IllegalStateException( + "updatePartitionOwners: Impossible to have the same " + + "previous and current worker info " + partitionOwner + + " as me " + myWorkerInfo); + } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) { + dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo()); + } else if (partitionOwner.getPreviousWorkerInfo().equals( + myWorkerInfo)) { + if (workerPartitionOwnerMap.containsKey( + partitionOwner.getWorkerInfo())) { + workerPartitionOwnerMap.get( + partitionOwner.getWorkerInfo()).add( + partitionOwner.getPartitionId()); + } else { + List tmpPartitionOwnerList = new ArrayList(); + tmpPartitionOwnerList.add(partitionOwner.getPartitionId()); + workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(), + tmpPartitionOwnerList); + } + } + } + + return new PartitionExchange(dependentWorkerSet, + workerPartitionOwnerMap); + } + + @Override + public Collection getPartitionOwners() { + return partitionOwnerList; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java new file mode 100644 index 0000000..e911303 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import java.util.Collection; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.giraph.graph.WorkerInfo; + +/** + * Determines how to divide the graph into partitions, how to manipulate + * partitions and then how to assign those partitions to workers. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message value + */ +@SuppressWarnings("rawtypes") +public interface MasterGraphPartitioner { + /** + * Set some initial partition owners for the graph. Guaranteed to be called + * prior to the graph being loaded (initial or restart). + * + * @param availableWorkerInfos Workers available for partition assignment + * @param maxWorkers Maximum number of workers + * @return Collection of generated partition owners. + */ + Collection createInitialPartitionOwners( + Collection availableWorkerInfos, int maxWorkers); + + /** + * After the worker stats have been merged to a single list, the master can + * use this information to send commands to the workers for any + * {@link Partition} changes. This protocol is specific to the + * {@link MasterGraphPartitioner} implementation. + * + * @param allPartitionStatsList All partition stats from all workers. + * @param availableWorkers Workers available for partition assignment + * @param maxWorkers Maximum number of workers + * @param superstep Partition owners will be set for this superstep + * @return Collection of {@link PartitionOwner} objects that changed from + * the previous superstep, empty list if no change. + */ + Collection generateChangedPartitionOwners( + Collection allPartitionStatsList, + Collection availableWorkers, + int maxWorkers, + long superstep); + + /** + * Get current partition owners at this time. + * + * @return Collection of current {@link PartitionOwner} objects + */ + Collection getCurrentPartitionOwners(); + + /** + * Instantiate the {@link PartitionStats} implementation used to read the + * worker stats + * + * @return Instantiated {@link PartitionStats} object + */ + PartitionStats createPartitionStats(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/Partition.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/Partition.java new file mode 100644 index 0000000..b0f156f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/Partition.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import org.apache.hadoop.util.Progressable; + +/** + * A generic container that stores vertices. Vertex ids will map to exactly + * one partition. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + */ +@SuppressWarnings("rawtypes") +public interface Partition + extends Writable, ImmutableClassesGiraphConfigurable, + Iterable> { + /** + * Initialize the partition. Guaranteed to be called before used. + * + * @param partitionId Partition id + * @param progressable Progressable to call progress + */ + void initialize(int partitionId, Progressable progressable); + + /** + * Get the vertex for this vertex index. + * + * @param vertexIndex Vertex index to search for + * @return Vertex if it exists, null otherwise + */ + Vertex getVertex(I vertexIndex); + + /** + * Put a vertex into the Partition + * + * @param vertex Vertex to put in the Partition + * @return old vertex value (i.e. null if none existed prior) + */ + Vertex putVertex(Vertex vertex); + + /** + * Remove a vertex from the Partition + * + * @param vertexIndex Vertex index to remove + * @return The removed vertex. + */ + Vertex removeVertex(I vertexIndex); + + /** + * Add a partition's vertices + * + * @param partition Partition to add + */ + void addPartition(Partition partition); + + /** + * Get the number of vertices in this partition + * + * @return Number of vertices + */ + long getVertexCount(); + + /** + * Get the number of edges in this partition. + * + * @return Number of edges. + */ + long getEdgeCount(); + + /** + * Get the partition id. + * + * @return Id of this partition. + */ + int getId(); + + /** + * Set the partition id. + * + * @param id Id of this partition + */ + void setId(int id); + + /** + * Set the context. + * + * @param progressable Progressable + */ + void setProgressable(Progressable progressable); + + /** + * Save potentially modified vertex back to the partition. + * + * @param vertex Vertex to save + */ + void saveVertex(Vertex vertex); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java new file mode 100644 index 0000000..2d1c2a2 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; + +import org.apache.giraph.graph.WorkerInfo; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; + +/** + * Helper class for balancing partitions across a set of workers. + */ +public class PartitionBalancer { + /** Partition balancing algorithm */ + public static final String PARTITION_BALANCE_ALGORITHM = + "hash.partitionBalanceAlgorithm"; + /** No rebalancing during the supersteps */ + public static final String STATIC_BALANCE_ALGORITHM = + "static"; + /** Rebalance across supersteps by edges */ + public static final String EGDE_BALANCE_ALGORITHM = + "edges"; + /** Rebalance across supersteps by vertices */ + public static final String VERTICES_BALANCE_ALGORITHM = + "vertices"; + /** Class logger */ + private static Logger LOG = Logger.getLogger(PartitionBalancer.class); + + /** + * What value to balance partitions with? Edges, vertices? + */ + private enum BalanceValue { + /** Not chosen */ + UNSET, + /** Balance with edges */ + EDGES, + /** Balance with vertices */ + VERTICES + } + + /** + * Do not construct this class. + */ + private PartitionBalancer() { } + + /** + * Get the value used to balance. + * + * @param partitionStat Stats of this partition. + * @param balanceValue Type of the value to balance. + * @return Balance value. + */ + private static long getBalanceValue(PartitionStats partitionStat, + BalanceValue balanceValue) { + switch (balanceValue) { + case EDGES: + return partitionStat.getEdgeCount(); + case VERTICES: + return partitionStat.getVertexCount(); + default: + throw new IllegalArgumentException( + "getBalanceValue: Illegal balance value " + balanceValue); + } + } + + /** + * Used to sort the partition owners from lowest value to highest value + */ + private static class PartitionOwnerComparator implements + Comparator { + /** Map of owner to stats */ + private final Map ownerStatMap; + /** Value type to compare on */ + private final BalanceValue balanceValue; + + + /** + * Only constructor. + * + * @param ownerStatMap Map of owners to stats. + * @param balanceValue Value to balance with. + */ + public PartitionOwnerComparator( + Map ownerStatMap, + BalanceValue balanceValue) { + this.ownerStatMap = ownerStatMap; + this.balanceValue = balanceValue; + } + + @Override + public int compare(PartitionOwner owner1, PartitionOwner owner2) { + return (int) + (getBalanceValue(ownerStatMap.get(owner1), balanceValue) - + getBalanceValue(ownerStatMap.get(owner2), balanceValue)); + } + } + + /** + * Structure to keep track of how much value a {@link WorkerInfo} has + * been assigned. + */ + private static class WorkerInfoAssignments implements + Comparable { + /** Worker info associated */ + private final WorkerInfo workerInfo; + /** Balance value */ + private final BalanceValue balanceValue; + /** Map of owner to stats */ + private final Map ownerStatsMap; + /** Current value of this object */ + private long value = 0; + + /** + * Constructor with final values. + * + * @param workerInfo Worker info for assignment. + * @param balanceValue Value used to balance. + * @param ownerStatsMap Map of owner to stats. + */ + public WorkerInfoAssignments( + WorkerInfo workerInfo, + BalanceValue balanceValue, + Map ownerStatsMap) { + this.workerInfo = workerInfo; + this.balanceValue = balanceValue; + this.ownerStatsMap = ownerStatsMap; + } + + /** + * Get the total value of all partitions assigned to this worker. + * + * @return Total value of all partition assignments. + */ + public long getValue() { + return value; + } + + /** + * Assign a {@link PartitionOwner} to this {@link WorkerInfo}. + * + * @param partitionOwner PartitionOwner to assign. + */ + public void assignPartitionOwner( + PartitionOwner partitionOwner) { + value += getBalanceValue(ownerStatsMap.get(partitionOwner), + balanceValue); + if (!partitionOwner.getWorkerInfo().equals(workerInfo)) { + partitionOwner.setPreviousWorkerInfo( + partitionOwner.getWorkerInfo()); + partitionOwner.setWorkerInfo(workerInfo); + } else { + partitionOwner.setPreviousWorkerInfo(null); + } + } + + @Override + public int compareTo(WorkerInfoAssignments other) { + return (int) + (getValue() - ((WorkerInfoAssignments) other).getValue()); + } + } + + /** + * Balance the partitions with an algorithm based on a value. + * + * @param conf Configuration to find the algorithm + * @param partitionOwners All the owners of all partitions + * @param allPartitionStats All the partition stats + * @param availableWorkerInfos All the available workers + * @return Balanced partition owners + */ + public static Collection balancePartitionsAcrossWorkers( + Configuration conf, + Collection partitionOwners, + Collection allPartitionStats, + Collection availableWorkerInfos) { + + String balanceAlgorithm = + conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM); + if (LOG.isInfoEnabled()) { + LOG.info("balancePartitionsAcrossWorkers: Using algorithm " + + balanceAlgorithm); + } + BalanceValue balanceValue = BalanceValue.UNSET; + if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) { + return partitionOwners; + } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) { + balanceValue = BalanceValue.EDGES; + } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) { + balanceValue = BalanceValue.VERTICES; + } else { + throw new IllegalArgumentException( + "balancePartitionsAcrossWorkers: Illegal balance " + + "algorithm - " + balanceAlgorithm); + } + + // Join the partition stats and partition owners by partition id + Map idStatMap = + new HashMap(); + for (PartitionStats partitionStats : allPartitionStats) { + if (idStatMap.put(partitionStats.getPartitionId(), partitionStats) != + null) { + throw new IllegalStateException( + "balancePartitionsAcrossWorkers: Duplicate partition id " + + "for " + partitionStats); + } + } + Map ownerStatsMap = + new HashMap(); + for (PartitionOwner partitionOwner : partitionOwners) { + PartitionStats partitionStats = + idStatMap.get(partitionOwner.getPartitionId()); + if (partitionStats == null) { + throw new IllegalStateException( + "balancePartitionsAcrossWorkers: Missing partition " + + "stats for " + partitionOwner); + } + if (ownerStatsMap.put(partitionOwner, partitionStats) != null) { + throw new IllegalStateException( + "balancePartitionsAcrossWorkers: Duplicate partition " + + "owner " + partitionOwner); + } + } + if (ownerStatsMap.size() != partitionOwners.size()) { + throw new IllegalStateException( + "balancePartitionsAcrossWorkers: ownerStats count = " + + ownerStatsMap.size() + ", partitionOwners count = " + + partitionOwners.size() + " and should match."); + } + + List workerInfoAssignmentsList = + new ArrayList(availableWorkerInfos.size()); + for (WorkerInfo workerInfo : availableWorkerInfos) { + workerInfoAssignmentsList.add( + new WorkerInfoAssignments( + workerInfo, balanceValue, ownerStatsMap)); + } + + // A simple heuristic for balancing the partitions across the workers + // using a value (edges, vertices). An improvement would be to + // take into account the already existing partition worker assignments. + // 1. Sort the partitions by size + // 2. Place the workers in a min heap sorted by their total balance + // value. + // 3. From largest partition to the smallest, take the partition + // worker at the top of the heap, add the partition to it, and + // then put it back in the heap + List partitionOwnerList = + new ArrayList(partitionOwners); + Collections.sort(partitionOwnerList, + Collections.reverseOrder( + new PartitionOwnerComparator(ownerStatsMap, balanceValue))); + PriorityQueue minQueue = + new PriorityQueue(workerInfoAssignmentsList); + for (PartitionOwner partitionOwner : partitionOwnerList) { + WorkerInfoAssignments chosenWorker = minQueue.remove(); + chosenWorker.assignPartitionOwner(partitionOwner); + minQueue.add(chosenWorker); + } + + return partitionOwnerList; + } +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java new file mode 100644 index 0000000..1b2be9a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.giraph.graph.WorkerInfo; + +/** + * Describes what is required to send and wait for in a potential partition + * exchange between workers. + */ +public class PartitionExchange { + /** Workers that I am dependent on before I can continue */ + private final Set myDependencyWorkerSet; + /** Workers that I need to sent partitions to */ + private final Map> sendWorkerPartitionMap; + + /** + * Only constructor. + * + * @param myDependencyWorkerSet All the workers I must wait for + * @param sendWorkerPartitionMap Partitions I need to send to other workers + */ + public PartitionExchange( + Set myDependencyWorkerSet, + Map> sendWorkerPartitionMap) { + this.myDependencyWorkerSet = myDependencyWorkerSet; + this.sendWorkerPartitionMap = sendWorkerPartitionMap; + } + + /** + * Get the workers that I must wait for + * + * @return Set of workers I must wait for + */ + public Set getMyDependencyWorkerSet() { + return myDependencyWorkerSet; + } + + /** + * Get a mapping of worker to list of partition ids I need to send to. + * + * @return Mapping of worker to partition id list I will send to. + */ + public Map> getSendWorkerPartitionMap() { + return sendWorkerPartitionMap; + } + + /** + * Is this worker involved in a partition exchange? Receiving or sending? + * + * @return True if needs to be involved in the exchange, false otherwise. + */ + public boolean doExchange() { + return !myDependencyWorkerSet.isEmpty() || + !sendWorkerPartitionMap.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java new file mode 100644 index 0000000..5616a8d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import org.apache.giraph.graph.WorkerInfo; +import org.apache.hadoop.io.Writable; + +/** + * Metadata about ownership of a partition. + */ +public interface PartitionOwner extends Writable { + /** + * Get the partition id that maps to the relevant {@link Partition} object + * + * @return Partition id + */ + int getPartitionId(); + + /** + * Get the worker information that is currently responsible for + * the partition id. + * + * @return Owning worker information. + */ + WorkerInfo getWorkerInfo(); + + /** + * Set the current worker info. + * + * @param workerInfo Worker info responsible for partition + */ + void setWorkerInfo(WorkerInfo workerInfo); + + /** + * Get the worker information that was previously responsible for the + * partition id. + * + * @return Owning worker information or null if no previous worker info. + */ + WorkerInfo getPreviousWorkerInfo(); + + /** + * Set the previous worker info. + * + * @param workerInfo Worker info that was previously responsible for the + * partition. + */ + void setPreviousWorkerInfo(WorkerInfo workerInfo); + + /** + * If this is a restarted checkpoint, the worker will use this information + * to determine where the checkpointed partition was stored on HDFS. + * + * @return Prefix of the checkpoint HDFS files for this partition, null if + * this is not a restarted superstep. + */ + String getCheckpointFilesPrefix(); + + /** + * Set the checkpoint files prefix. Master uses this. + * + * @param checkpointFilesPrefix HDFS checkpoint file prefix + */ + void setCheckpointFilesPrefix(String checkpointFilesPrefix); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java new file mode 100644 index 0000000..6ee0228 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.graph.partition; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * Used to keep track of statistics of every {@link Partition}. Contains no + * actual partition data, only the statistics. + */ +public class PartitionStats implements Writable { + /** Id of partition to keep stats for */ + private int partitionId = -1; + /** Vertices in this partition */ + private long vertexCount = 0; + /** Finished vertices in this partition */ + private long finishedVertexCount = 0; + /** Edges in this partition */ + private long edgeCount = 0; + /** Messages sent from this partition */ + private long messagesSentCount = 0; + + /** + * Default constructor for reflection. + */ + public PartitionStats() { } + + /** + * Constructor with the initial stats. + * + * @param partitionId Partition count. + * @param vertexCount Vertex count. + * @param finishedVertexCount Finished vertex count. + * @param edgeCount Edge count. + * @param messagesSentCount Number of messages sent + */ + public PartitionStats(int partitionId, + long vertexCount, + long finishedVertexCount, + long edgeCount, + long messagesSentCount) { + this.partitionId = partitionId; + this.vertexCount = vertexCount; + this.finishedVertexCount = finishedVertexCount; + this.edgeCount = edgeCount; + this.messagesSentCount = messagesSentCount; + } + + /** + * Set the partition id. + * + * @param partitionId New partition id. + */ + public void setPartitionId(int partitionId) { + this.partitionId = partitionId; + } + + /** + * Get partition id. + * + * @return Partition id. + */ + public int getPartitionId() { + return partitionId; + } + + /** + * Increment the vertex count by one. + */ + public void incrVertexCount() { + ++vertexCount; + } + + /** + * Get the vertex count. + * + * @return Vertex count. + */ + public long getVertexCount() { + return vertexCount; + } + + /** + * Increment the finished vertex count by one. + */ + public void incrFinishedVertexCount() { + ++finishedVertexCount; + } + + /** + * Get the finished vertex count. + * + * @return Finished vertex count. + */ + public long getFinishedVertexCount() { + return finishedVertexCount; + } + + /** + * Add edges to the edge count. + * + * @param edgeCount Number of edges to add. + */ + public void addEdgeCount(long edgeCount) { + this.edgeCount += edgeCount; + } + + /** + * Get the edge count. + * + * @return Edge count. + */ + public long getEdgeCount() { + return edgeCount; + } + + /** + * Add messages to the messages sent count. + * + * @param messagesSentCount Number of messages to add. + */ + public void addMessagesSentCount(long messagesSentCount) { + this.messagesSentCount += messagesSentCount; + } + + /** + * Get the messages sent count. + * + * @return Messages sent count. + */ + public long getMessagesSentCount() { + return messagesSentCount; + } + + @Override + public void readFields(DataInput input) throws IOException { + partitionId = input.readInt(); + vertexCount = input.readLong(); + finishedVertexCount = input.readLong(); + edgeCount = input.readLong(); + messagesSentCount = input.readLong(); + } + + @Override + public void write(DataOutput output) throws IOException { + output.writeInt(partitionId); + output.writeLong(vertexCount); + output.writeLong(finishedVertexCount); + output.writeLong(edgeCount); + output.writeLong(messagesSentCount); + } + + @Override + public String toString() { + return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" + + finishedVertexCount + ",edges=" + edgeCount + ",msgsSent=" + + messagesSentCount + ")"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java new file mode 100644 index 0000000..07f55ed --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Structure that stores partitions for a worker. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + */ +public abstract class PartitionStore { + + /** + * Add a new partition to the store or just the vertices from the partition + * to the old partition. + * + * @param partition Partition to add + */ + public abstract void addPartition(Partition partition); + + /** + * Get a partition. + * + * @param partitionId Partition id + * @return The requested partition + */ + public abstract Partition getPartition(Integer partitionId); + + /** + * Remove a partition and return it. + * + * @param partitionId Partition id + * @return The removed partition + */ + public abstract Partition removePartition(Integer partitionId); + + /** + * Just delete a partition + * (more efficient than {@link #removePartition(Integer partitionID)} if the + * partition is out of core). + * + * @param partitionId Partition id + */ + public abstract void deletePartition(Integer partitionId); + + /** + * Whether a specific partition is present in the store. + * + * @param partitionId Partition id + * @return True iff the partition is present + */ + public abstract boolean hasPartition(Integer partitionId); + + /** + * Return the ids of all the stored partitions as an Iterable. + * + * @return The partition ids + */ + public abstract Iterable getPartitionIds(); + + /** + * Return the number of stored partitions. + * + * @return The number of partitions + */ + public abstract int getNumPartitions(); + + /** + * Whether the partition store is empty. + * + * @return True iff there are no partitions in the store + */ + public boolean isEmpty() { + return getNumPartitions() == 0; + } + + /** + * Return all the stored partitions as an Iterable. Note that this may force + * out-of-core partitions to be loaded into memory if using out-of-core. + * + * @return The partition objects + */ + public Iterable> getPartitions() { + return Iterables.transform(getPartitionIds(), + new Function>() { + @Override + public Partition apply(Integer partitionId) { + return getPartition(partitionId); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java new file mode 100644 index 0000000..5600dad --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.giraph.graph.VertexEdgeCount; +import org.apache.giraph.graph.WorkerInfo; +import org.apache.log4j.Logger; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Helper class for {@link Partition} related operations. + */ +public class PartitionUtils { + /** Class logger */ + private static Logger LOG = Logger.getLogger(PartitionUtils.class); + + /** + * Do not construct this object. + */ + private PartitionUtils() { } + + /** + * Compare edge counts for Entry objects. + */ + private static class EdgeCountComparator implements + Comparator> { + @Override + public int compare(Entry worker1, + Entry worker2) { + return (int) (worker1.getValue().getEdgeCount() - + worker2.getValue().getEdgeCount()); + } + } + + /** + * Compare vertex counts between a {@link WorkerInfo} and + * {@link VertexEdgeCount}. + */ + private static class VertexCountComparator implements + Comparator> { + @Override + public int compare(Entry worker1, + Entry worker2) { + return (int) (worker1.getValue().getVertexCount() - + worker2.getValue().getVertexCount()); + } + } + + /** + * Check for imbalances on a per worker basis, by calculating the + * mean, high and low workers by edges and vertices. + * + * @param partitionOwnerList List of partition owners. + * @param allPartitionStats All the partition stats. + */ + public static void analyzePartitionStats( + Collection partitionOwnerList, + List allPartitionStats) { + Map idOwnerMap = + new HashMap(); + for (PartitionOwner partitionOwner : partitionOwnerList) { + if (idOwnerMap.put(partitionOwner.getPartitionId(), + partitionOwner) != null) { + throw new IllegalStateException( + "analyzePartitionStats: Duplicate partition " + + partitionOwner); + } + } + + Map workerStatsMap = Maps.newHashMap(); + VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount(); + for (PartitionStats partitionStats : allPartitionStats) { + WorkerInfo workerInfo = + idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo(); + VertexEdgeCount vertexEdgeCount = + workerStatsMap.get(workerInfo); + if (vertexEdgeCount == null) { + workerStatsMap.put( + workerInfo, + new VertexEdgeCount(partitionStats.getVertexCount(), + partitionStats.getEdgeCount())); + } else { + workerStatsMap.put( + workerInfo, + vertexEdgeCount.incrVertexEdgeCount( + partitionStats.getVertexCount(), + partitionStats.getEdgeCount())); + } + totalVertexEdgeCount = + totalVertexEdgeCount.incrVertexEdgeCount( + partitionStats.getVertexCount(), + partitionStats.getEdgeCount()); + } + + List> workerEntryList = + Lists.newArrayList(workerStatsMap.entrySet()); + + if (LOG.isInfoEnabled()) { + Collections.sort(workerEntryList, new VertexCountComparator()); + LOG.info("analyzePartitionStats: Vertices - Mean: " + + (totalVertexEdgeCount.getVertexCount() / + workerStatsMap.size()) + + ", Min: " + + workerEntryList.get(0).getKey() + " - " + + workerEntryList.get(0).getValue().getVertexCount() + + ", Max: " + + workerEntryList.get(workerEntryList.size() - 1).getKey() + + " - " + + workerEntryList.get(workerEntryList.size() - 1). + getValue().getVertexCount()); + Collections.sort(workerEntryList, new EdgeCountComparator()); + LOG.info("analyzePartitionStats: Edges - Mean: " + + (totalVertexEdgeCount.getEdgeCount() / + workerStatsMap.size()) + + ", Min: " + + workerEntryList.get(0).getKey() + " - " + + workerEntryList.get(0).getValue().getEdgeCount() + + ", Max: " + + workerEntryList.get(workerEntryList.size() - 1).getKey() + + " - " + + workerEntryList.get(workerEntryList.size() - 1). + getValue().getEdgeCount()); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java new file mode 100644 index 0000000..8e417ec --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph.partition; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Some functionality is provided, but this is meant for developers to + * determine the partitioning based on the actual types of data. The + * implementation of several methods are left to the developer who is trying + * to control the amount of messages sent from one worker to another. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message value + */ +@SuppressWarnings("rawtypes") +public abstract class RangeMasterPartitioner implements + MasterGraphPartitioner { + @Override + public PartitionStats createPartitionStats() { + return new RangePartitionStats(); + } +}