Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BD2921006B for ; Wed, 8 Jan 2014 20:04:43 +0000 (UTC) Received: (qmail 70922 invoked by uid 500); 8 Jan 2014 20:04:43 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 70887 invoked by uid 500); 8 Jan 2014 20:04:42 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 70880 invoked by uid 99); 8 Jan 2014 20:04:42 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jan 2014 20:04:42 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 56A2054E39; Wed, 8 Jan 2014 20:04:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: maja@apache.org To: commits@giraph.apache.org Message-Id: <487c4631bb324628ae9c985735003add@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: updated refs/heads/trunk to cdb49fd Date: Wed, 8 Jan 2014 20:04:42 +0000 (UTC) Updated Branches: refs/heads/trunk 4ce0f6a0d -> cdb49fd5f GIRAPH-785: Improve GraphPartitionerFactory usage (ikabiljo via majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/cdb49fd5 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/cdb49fd5 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/cdb49fd5 Branch: refs/heads/trunk Commit: cdb49fd5f76027a4a66930e6b266ba79266c9852 Parents: 4ce0f6a Author: Maja Kabiljo Authored: Wed Jan 8 12:04:23 2014 -0800 Committer: Maja Kabiljo Committed: Wed Jan 8 12:04:23 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/partition/PartitionUtils.java | 15 ++- .../partition/RangeMasterPartitioner.java | 42 ------- .../giraph/partition/RangePartitionOwner.java | 90 -------------- .../giraph/partition/RangePartitionStats.java | 68 ----------- .../partition/RangePartitionerFactory.java | 42 ------- .../apache/giraph/partition/RangeSplitHint.java | 72 ----------- .../partition/RangeWorkerPartitioner.java | 77 ------------ .../SimpleIntRangePartitionerFactory.java | 45 +++---- .../SimpleLongRangePartitionerFactory.java | 43 +++---- .../partition/SimpleMasterPartitioner.java | 106 ++++++++++++++++ .../partition/SimplePartitionerFactory.java | 122 +++++++++++++++++++ .../partition/SimpleRangeMasterPartitioner.java | 116 ------------------ .../partition/SimpleRangeWorkerPartitioner.java | 107 ---------------- .../partition/SimpleWorkerPartitioner.java | 85 +++++++++++++ .../SimpleRangePartitionFactoryTest.java | 117 ++++++++++++++++++ 16 files changed, 473 insertions(+), 676 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index d0259e3..b29aa66 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-785: Improve GraphPartitionerFactory usage (ikabiljo via majakabiljo) + GIRAPH-815: Exclude dependency and duplicate finder checks to profile we do not check (aching) GIRAPH-798: Upgrade Giraph to Java7 and fix all dependencies (aching) http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java index b055f4d..68bc2de 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java @@ -195,11 +195,20 @@ public class PartitionUtils { } int maxPartitions = getMaxPartitions(conf); if (partitionCount > maxPartitions) { + // try to keep partitionCount divisible by number of workers + // in order to keep the balance + int reducedPartitions = (maxPartitions / availableWorkerInfos.size()) * + availableWorkerInfos.size(); + if (reducedPartitions == 0) { + reducedPartitions = maxPartitions; + } LOG.warn("computePartitionCount: " + - "Reducing the partitionCount to " + maxPartitions + - " from " + partitionCount); - partitionCount = maxPartitions; + "Reducing the partitionCount to " + reducedPartitions + + " from " + partitionCount + " because of " + maxPartitions + + " limit"); + partitionCount = reducedPartitions; } + return partitionCount; } http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java deleted file mode 100644 index 3911a95..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Some functionality is provided, but this is meant for developers to - * determine the partitioning based on the actual types of data. The - * implementation of several methods are left to the developer who is trying - * to control the amount of messages sent from one worker to another. - * - * @param Vertex index value - * @param Vertex value - * @param Edge value - */ -@SuppressWarnings("rawtypes") -public abstract class RangeMasterPartitioner implements - MasterGraphPartitioner { - @Override - public PartitionStats createPartitionStats() { - return new RangePartitionStats(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java deleted file mode 100644 index e7e03dc..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Map; - -import org.apache.giraph.worker.WorkerInfo; -import org.apache.hadoop.io.WritableComparable; - -/** - * Added the max key index in to the {@link PartitionOwner}. Also can provide - * a split hint if desired. - * - * @param Vertex index type - */ -@SuppressWarnings("rawtypes") -public class RangePartitionOwner - extends BasicPartitionOwner { - /** Max index for this partition */ - private I maxIndex; - - /** - * Default constructor. - */ - public RangePartitionOwner() { } - - /** - * Constructor with the max index. - * - * @param maxIndex Max index of this partition. - */ - public RangePartitionOwner(I maxIndex) { - this.maxIndex = maxIndex; - } - - /** - * Get the maximum index of this partition owner. - * - * @return Maximum index. - */ - public I getMaxIndex() { - return maxIndex; - } - - @Override - public void readFields(DataInput input) throws IOException { - super.readFields(input); - maxIndex = (I) getConf().createVertexId(); - maxIndex.readFields(input); - } - - @Override - public void write(DataOutput output) throws IOException { - super.write(output); - maxIndex.write(output); - } - - @Override - public void writeWithWorkerIds(DataOutput output) throws IOException { - super.writeWithWorkerIds(output); - maxIndex.write(output); - } - - @Override - public void readFieldsWithWorkerIds(DataInput input, - Map workerInfoMap) throws IOException { - super.readFieldsWithWorkerIds(input, workerInfoMap); - maxIndex = (I) getConf().createVertexId(); - maxIndex.readFields(input); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java deleted file mode 100644 index 73af816..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.WritableComparable; - -/** - * Same as {@link PartitionStats}, but also includes the hint for range-based - * partitioning. - * - * @param Vertex index type - */ -@SuppressWarnings("rawtypes") -public class RangePartitionStats - extends PartitionStats { - /** Can be null if no hint, otherwise a splitting hint */ - private RangeSplitHint hint; - - /** - * Get the range split hint (if any) - * - * @return Hint of how to split the range if desired, null otherwise - */ - public RangeSplitHint getRangeSplitHint() { - return hint; - } - - @Override - public void readFields(DataInput input) throws IOException { - super.readFields(input); - boolean hintExists = input.readBoolean(); - if (hintExists) { - hint = new RangeSplitHint(); - hint.readFields(input); - } else { - hint = null; - } - } - - @Override - public void write(DataOutput output) throws IOException { - super.write(output); - output.writeBoolean(hint != null); - if (hint != null) { - hint.write(output); - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java deleted file mode 100644 index 2ec4d4a..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Range partitioning will split the vertices by a key range based on a generic - * type. This allows vertices that have some locality with the vertex ids - * to reduce the amount of messages sent. The tradeoffs are that - * range partitioning is more susceptible to hot spots if the keys - * are not randomly distributed. Another negative is the user must implement - * some of the functionality around how to split the key range. - * - * See {@link RangeWorkerPartitioner} - * - * @param Vertex index value - * @param Vertex value - * @param Edge value - */ -@SuppressWarnings("rawtypes") -public abstract class RangePartitionerFactory - implements GraphPartitionerFactory { -} http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java deleted file mode 100644 index 4317944..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Hint to the {@link RangeMasterPartitioner} about how a - * {@link RangePartitionOwner} can be split. - * - * @param Vertex index to split around - */ -@SuppressWarnings("rawtypes") -public class RangeSplitHint - implements Writable, ImmutableClassesGiraphConfigurable { - /** Hinted split index */ - private I splitIndex; - /** Number of vertices in this range before the split */ - private long preSplitVertexCount; - /** Number of vertices in this range after the split */ - private long postSplitVertexCount; - /** Configuration */ - private ImmutableClassesGiraphConfiguration conf; - - @Override - public void readFields(DataInput input) throws IOException { - splitIndex = conf.createVertexId(); - splitIndex.readFields(input); - preSplitVertexCount = input.readLong(); - postSplitVertexCount = input.readLong(); - } - - @Override - public void write(DataOutput output) throws IOException { - splitIndex.write(output); - output.writeLong(preSplitVertexCount); - output.writeLong(postSplitVertexCount); - } - - @Override - public ImmutableClassesGiraphConfiguration getConf() { - return conf; - } - - @Override - public void setConf(ImmutableClassesGiraphConfiguration conf) { - this.conf = conf; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java deleted file mode 100644 index cbcd753..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import java.util.Collection; -import java.util.NavigableMap; -import java.util.TreeMap; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Range partitioning will split the vertices by a key range based on a generic - * type. This allows vertices that have some locality with the vertex ids - * to reduce the amount of messages sent. The tradeoffs are that - * range partitioning is more susceptible to hot spots if the keys - * are not randomly distributed. Another negative is the user must implement - * some of the functionality around how to split the key range. - * - * Note: This implementation is incomplete, the developer must implement the - * various methods based on their index type. - * - * @param Vertex index value - * @param Vertex value - * @param Edge value - */ -@SuppressWarnings("rawtypes") -public abstract class RangeWorkerPartitioner implements - WorkerGraphPartitioner { - /** Mapping of the vertex ids to the {@link PartitionOwner} */ - protected NavigableMap> vertexRangeMap = - new TreeMap>(); - - @Override - public PartitionOwner createPartitionOwner() { - return new RangePartitionOwner(); - } - - @Override - public PartitionOwner getPartitionOwner(I vertexId) { - // Find the partition owner based on the maximum partition id. - // If the vertex id exceeds any of the maximum partition ids, give - // it to the last one - if (vertexId == null) { - throw new IllegalArgumentException( - "getPartitionOwner: Illegal null vertex id"); - } - I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId); - if (maxVertexIndex == null) { - return vertexRangeMap.lastEntry().getValue(); - } else { - return vertexRangeMap.get(vertexId); - } - } - - @Override - public Collection getPartitionOwners() { - return vertexRangeMap.values(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java index 7aee84c..8ab692f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java @@ -31,47 +31,32 @@ import org.apache.hadoop.io.Writable; * @param Vertex value type * @param Edge value type */ -public class SimpleIntRangePartitionerFactory - implements GraphPartitionerFactory { - /** Configuration. */ - private ImmutableClassesGiraphConfiguration conf; +public class SimpleIntRangePartitionerFactory + + extends SimplePartitionerFactory { + /** Vertex key space size. */ - private long keySpaceSize; + private int keySpaceSize; @Override - public MasterGraphPartitioner - createMasterGraphPartitioner() { - return new SimpleRangeMasterPartitioner(conf); + protected int getPartition(IntWritable id, int partitionCount) { + return getPartitionInRange(id.get(), keySpaceSize, partitionCount); } @Override - public WorkerGraphPartitioner - createWorkerGraphPartitioner() { - return new SimpleRangeWorkerPartitioner( - keySpaceSize) { - @Override - protected long vertexKeyFromId(IntWritable id) { - // The modulo is just a safeguard in case keySpaceSize is incorrect. - return id.get() % keySpaceSize; - } - }; + protected int getWorker(int partition, int partitionCount, int workerCount) { + return getPartitionInRange(partition, partitionCount, workerCount); } @Override public void setConf(ImmutableClassesGiraphConfiguration conf) { - this.conf = conf; - keySpaceSize = conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, - -1); + super.setConf(conf); + keySpaceSize = + conf.getInt(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, -1); if (keySpaceSize == -1) { - throw new IllegalStateException("Need to specify " + GiraphConstants - .PARTITION_VERTEX_KEY_SPACE_SIZE + " when using " + - "SimpleRangePartitioner"); + throw new IllegalStateException("Need to specify " + + GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE + + " when using SimpleIntRangePartitionerFactory"); } } - - @Override - public ImmutableClassesGiraphConfiguration getConf() { - return conf; - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java index 64efde9..2989598 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java @@ -31,47 +31,32 @@ import org.apache.hadoop.io.Writable; * @param Vertex value type * @param Edge value type */ -public class SimpleLongRangePartitionerFactory - implements GraphPartitionerFactory { - /** Configuration. */ - private ImmutableClassesGiraphConfiguration conf; +public class SimpleLongRangePartitionerFactory + + extends SimplePartitionerFactory { + /** Vertex key space size. */ private long keySpaceSize; @Override - public MasterGraphPartitioner - createMasterGraphPartitioner() { - return new SimpleRangeMasterPartitioner(conf); + protected int getPartition(LongWritable id, int partitionCount) { + return getPartitionInRange(id.get(), keySpaceSize, partitionCount); } @Override - public WorkerGraphPartitioner - createWorkerGraphPartitioner() { - return new SimpleRangeWorkerPartitioner( - keySpaceSize) { - @Override - protected long vertexKeyFromId(LongWritable id) { - // The modulo is just a safeguard in case keySpaceSize is incorrect. - return id.get() % keySpaceSize; - } - }; + protected int getWorker(int partition, int partitionCount, int workerCount) { + return getPartitionInRange(partition, partitionCount, workerCount); } @Override public void setConf(ImmutableClassesGiraphConfiguration conf) { - this.conf = conf; - keySpaceSize = conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, - -1); + super.setConf(conf); + keySpaceSize = + conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, -1); if (keySpaceSize == -1) { - throw new IllegalStateException("Need to specify " + GiraphConstants - .PARTITION_VERTEX_KEY_SPACE_SIZE + " when using " + - "SimpleRangePartitioner"); + throw new IllegalStateException("Need to specify " + + GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE + + " when using SimpleLongRangePartitionerFactory"); } } - - @Override - public ImmutableClassesGiraphConfiguration getConf() { - return conf; - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java new file mode 100644 index 0000000..f128f34 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +/** + * Abstracts and implements all MasterGraphPartitioner logic on top of a single + * user function - getWorkerIndex. + * + * @param Vertex id type + * @param Vertex value type + * @param Edge value type + */ +public abstract class SimpleMasterPartitioner + implements MasterGraphPartitioner { + /** Class logger */ + private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class); + /** Provided configuration */ + private ImmutableClassesGiraphConfiguration conf; + /** Save the last generated partition owner list */ + private List partitionOwnerList; + + /** + * Constructor. + * + * @param conf + * Configuration used. + */ + public SimpleMasterPartitioner(ImmutableClassesGiraphConfiguration conf) { + this.conf = conf; + } + + @Override + public Collection createInitialPartitionOwners( + Collection availableWorkerInfos, int maxWorkers) { + int partitionCount = PartitionUtils.computePartitionCount( + availableWorkerInfos, maxWorkers, conf); + ArrayList workerList = + new ArrayList(availableWorkerInfos); + + partitionOwnerList = new ArrayList(); + for (int i = 0; i < partitionCount; i++) { + partitionOwnerList.add(new BasicPartitionOwner(i, workerList.get( + getWorkerIndex(i, partitionCount, workerList.size())))); + } + + return partitionOwnerList; + } + + @Override + public Collection generateChangedPartitionOwners( + Collection allPartitionStatsList, + Collection availableWorkers, + int maxWorkers, + long superstep) { + return PartitionBalancer.balancePartitionsAcrossWorkers(conf, + partitionOwnerList, allPartitionStatsList, availableWorkers); + } + + @Override + public Collection getCurrentPartitionOwners() { + return partitionOwnerList; + } + + @Override + public PartitionStats createPartitionStats() { + return new PartitionStats(); + } + + /** + * Calculates worker that should be responsible for passed partition. + * + * @param partition Current partition + * @param partitionCount Number of partitions + * @param workerCount Number of workers + * @return index of worker responsible for current partition + */ + protected abstract int getWorkerIndex( + int partition, int partitionCount, int workerCount); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java new file mode 100644 index 0000000..15b0756 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Abstracts and implements all GraphPartitionerFactory logic on top of two + * functions which define partitioning scheme: + * - which partition user should be in, and + * - which partition should belong to which worker + * + * @param Vertex id + * @param Vertex value + * @param Edge value + */ +public abstract class SimplePartitionerFactory + implements GraphPartitionerFactory { + /** Configuration. */ + private ImmutableClassesGiraphConfiguration conf; + + @Override + public final MasterGraphPartitioner createMasterGraphPartitioner() { + return new SimpleMasterPartitioner(conf) { + @Override + protected int getWorkerIndex(int partition, int partitionCount, + int workerCount) { + return SimplePartitionerFactory.this.getWorker( + partition, partitionCount, workerCount); + } + }; + } + + @Override + public final WorkerGraphPartitioner createWorkerGraphPartitioner() { + return new SimpleWorkerPartitioner() { + @Override + protected int getPartitionIndex(I id, int partitionCount) { + return SimplePartitionerFactory.this.getPartition(id, partitionCount); + } + }; + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration conf) { + this.conf = conf; + } + + @Override + public final ImmutableClassesGiraphConfiguration getConf() { + return conf; + } + + /** + * Calculates in which partition current vertex belongs to, + * from interval [0, partitionCount). + * + * @param id Vertex id + * @param partitionCount Number of partitions + * @return partition + */ + protected abstract int getPartition(I id, int partitionCount); + /** + * Calculates worker that should be responsible for passed partition. + * + * @param partition Current partition + * @param partitionCount Number of partitions + * @param workerCount Number of workers + * @return index of worker responsible for current partition + */ + protected abstract int getWorker( + int partition, int partitionCount, int workerCount); + + /** + * Utility function for calculating in which partition value + * from interval [0, max) should belong to. + * + * @param value Value for which partition is requested + * @param max Maximum possible value + * @param partitions Number of partitions, equally sized. + * @return Index of partition where value belongs to. + */ + public static int getPartitionInRange(int value, int max, int partitions) { + double keyRange = ((double) max) / partitions; + int part = (int) ((value % max) / keyRange); + return Math.max(0, Math.min(partitions - 1, part)); + } + + /** + * Utility function for calculating in which partition value + * from interval [0, max) should belong to. + * + * @param value Value for which partition is requested + * @param max Maximum possible value + * @param partitions Number of partitions, equally sized. + * @return Index of partition where value belongs to. + */ + public static int getPartitionInRange(long value, long max, int partitions) { + double keyRange = ((double) max) / partitions; + int part = (int) ((value % max) / keyRange); + return Math.max(0, Math.min(partitions - 1, part)); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java deleted file mode 100644 index 37ce8c7..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.worker.WorkerInfo; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.log4j.Logger; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -/** - * A range-based master partitioner where equal-sized ranges of partitions - * are deterministically assigned to workers. - * - * @param Vertex id type - * @param Vertex value type - * @param Edge value type - */ -public class SimpleRangeMasterPartitioner implements - MasterGraphPartitioner { - /** Class logger */ - private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class); - /** Provided configuration */ - private ImmutableClassesGiraphConfiguration conf; - /** Save the last generated partition owner list */ - private List partitionOwnerList; - - /** - * Constructor. - * - * @param conf Configuration used. - */ - public SimpleRangeMasterPartitioner( - ImmutableClassesGiraphConfiguration conf) { - this.conf = conf; - } - - @Override - public Collection createInitialPartitionOwners( - Collection availableWorkerInfos, int maxWorkers) { - int partitionCount = PartitionUtils.computePartitionCount( - availableWorkerInfos, maxWorkers, conf); - int rangeSize = partitionCount / availableWorkerInfos.size(); - - partitionOwnerList = new ArrayList(); - Iterator workerIt = availableWorkerInfos.iterator(); - WorkerInfo currentWorker = null; - - int i = 0; - for (; i < partitionCount; ++i) { - if (i % rangeSize == 0) { - if (!workerIt.hasNext()) { - break; - } - currentWorker = workerIt.next(); - } - partitionOwnerList.add(new BasicPartitionOwner(i, currentWorker)); - } - - // Distribute the remainder among all workers. - if (i < partitionCount) { - workerIt = availableWorkerInfos.iterator(); - for (; i < partitionCount; ++i) { - partitionOwnerList.add(new BasicPartitionOwner(i, workerIt.next())); - } - } - - return partitionOwnerList; - } - - @Override - public Collection generateChangedPartitionOwners( - Collection allPartitionStatsList, - Collection availableWorkers, - int maxWorkers, - long superstep) { - return PartitionBalancer.balancePartitionsAcrossWorkers( - conf, - partitionOwnerList, - allPartitionStatsList, - availableWorkers); - } - - @Override - public Collection getCurrentPartitionOwners() { - return partitionOwnerList; - } - - @Override - public PartitionStats createPartitionStats() { - return new PartitionStats(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java deleted file mode 100644 index ab2afd5..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import com.google.common.collect.Lists; -import org.apache.giraph.worker.WorkerInfo; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import java.util.Collection; -import java.util.List; - -/** - * A range-based worker partitioner where equal-sized ranges of vertex ids - * are deterministically assigned to partitions. - * The user has to define a mapping from vertex ids to long keys dense in - * [0, keySpaceSize). - * - * @param Vertex id type - * @param Vertex value type - * @param Edge value type - */ -public abstract class SimpleRangeWorkerPartitioner - implements WorkerGraphPartitioner { - /** List of {@link PartitionOwner}s for this worker. */ - private List partitionOwnerList = Lists.newArrayList(); - /** Vertex keys space size. */ - private long keySpaceSize; - - /** - * Constructor. - * - * @param keySpaceSize Vertex keys space size. - */ - public SimpleRangeWorkerPartitioner(long keySpaceSize) { - this.keySpaceSize = keySpaceSize; - } - - /** - * Get key space size (can be used when implementing vertexKeyFromId()). - * - * @return Key space size. - */ - public long getKeySpaceSize() { - return keySpaceSize; - } - - /** - * Convert a vertex id to a unique long key in [0, keySpaceSize]. - * - * @param id Vertex id - * @return Unique long key - */ - protected abstract long vertexKeyFromId(I id); - - @Override - public PartitionOwner createPartitionOwner() { - return new BasicPartitionOwner(); - } - - @Override - public PartitionOwner getPartitionOwner(I vertexId) { - long rangeSize = keySpaceSize / partitionOwnerList.size(); - return partitionOwnerList.get( - Math.min((int) (vertexKeyFromId(vertexId) / rangeSize), - partitionOwnerList.size() - 1)); - } - - @Override - public Collection finalizePartitionStats( - Collection workerPartitionStats, - PartitionStore partitionStore) { - // No modification necessary - return workerPartitionStats; - } - - @Override - public PartitionExchange updatePartitionOwners( - WorkerInfo myWorkerInfo, - Collection masterSetPartitionOwners, - PartitionStore partitionStore) { - return PartitionBalancer.updatePartitionOwners(partitionOwnerList, - myWorkerInfo, masterSetPartitionOwners, partitionStore); - } - - @Override - public Collection getPartitionOwners() { - return partitionOwnerList; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java new file mode 100644 index 0000000..600d7a3 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + +import java.util.Collection; +import java.util.List; + +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.collect.Lists; + +/** + * Abstracts and implements all WorkerGraphPartitioner logic on top of a single + * user function - getPartitionIndex. + * + * @param Vertex id type + * @param Vertex value type + * @param Edge value type + */ +public abstract class SimpleWorkerPartitioner + implements WorkerGraphPartitioner { + /** List of {@link PartitionOwner}s for this worker. */ + private List partitionOwnerList = Lists.newArrayList(); + + @Override + public PartitionOwner createPartitionOwner() { + return new BasicPartitionOwner(); + } + + @Override + public PartitionOwner getPartitionOwner(I vertexId) { + return partitionOwnerList.get( + getPartitionIndex(vertexId, partitionOwnerList.size())); + } + + @Override + public Collection finalizePartitionStats( + Collection workerPartitionStats, + PartitionStore partitionStore) { + // No modification necessary + return workerPartitionStats; + } + + @Override + public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo, + Collection masterSetPartitionOwners, + PartitionStore partitionStore) { + return PartitionBalancer.updatePartitionOwners(partitionOwnerList, + myWorkerInfo, masterSetPartitionOwners, partitionStore); + } + + @Override + public Collection getPartitionOwners() { + return partitionOwnerList; + } + + /** + * Calculates in which partition current vertex belongs to, + * from interval [0, partitionCount). + * + * @param id Vertex id + * @param partitionCount Number of partitions + * @return partition + */ + protected abstract int getPartitionIndex(I id, int partitionCount); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java new file mode 100644 index 0000000..4e19cd2 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.partition.WorkerGraphPartitioner; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +/** Test {@link org.apache.giraph.partition.SimpleLongRangePartitionerFactory}. */ +public class SimpleRangePartitionFactoryTest { + + private void testRange(int numWorkers, int keySpaceSize, int allowedWorkerDiff, boolean emptyWorkers) { + Configuration conf = new Configuration(); + conf.setLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, keySpaceSize); + SimpleLongRangePartitionerFactory factory = + new SimpleLongRangePartitionerFactory(); + factory.setConf(new ImmutableClassesGiraphConfiguration(conf)); + + ArrayList infos = new ArrayList(); + for (int i = 0; i < numWorkers; i++) { + WorkerInfo info = new WorkerInfo(); + info.setTaskId(i); + infos.add(info); + } + + Collection owners = + factory.createMasterGraphPartitioner().createInitialPartitionOwners(infos, -1); + + int[] tasks = new int[owners.size()]; + for (PartitionOwner owner : owners) { + WorkerInfo worker = owner.getWorkerInfo(); + assertEquals(0, tasks[owner.getPartitionId()]); + tasks[owner.getPartitionId()] = worker.getTaskId() + 1; + } + checkMapping(tasks, allowedWorkerDiff, emptyWorkers); + + WorkerGraphPartitioner workerPartitioner = + factory.createWorkerGraphPartitioner(); + workerPartitioner.updatePartitionOwners(null, owners, null); + LongWritable longWritable = new LongWritable(); + + int[] partitions = new int[keySpaceSize]; + for (int i = 0; i < keySpaceSize; i++) { + longWritable.set(i); + PartitionOwner owner = workerPartitioner.getPartitionOwner(longWritable); + partitions[i] = owner.getPartitionId(); + } + checkMapping(partitions, 1, emptyWorkers); + } + + private void checkMapping(int[] mapping, int allowedDiff, boolean emptyWorkers) { + int prev = -1; + + int max = 0; + int min = Integer.MAX_VALUE; + int cur = 0; + for (int value : mapping) { + if (value != prev) { + if (prev != -1) { + min = Math.min(cur, min); + max = Math.max(cur, max); + assertTrue(prev < value); + if (!emptyWorkers) { + assertEquals(prev + 1, value); + } + } + cur = 1; + } else { + cur++; + } + prev = value; + } + assertTrue(min + allowedDiff >= max); + } + + @Test + public void testLongRangePartitionerFactory() { + // perfect distribution + testRange(10, 100000, 0, false); + testRange(1000, 100000, 0, false); + + // perfect distribution even when max is hit, and max is not divisible by #workers + testRange(8949, 100023, 0, false); + testRange(1949, 211111, 0, false); + + // imperfect distribution - because there are more workers than max partitions. + testRange(194942, 211111, 1, true); + } +}