Return-Path: X-Original-To: apmail-incubator-giraph-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-giraph-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E055D9E53 for ; Thu, 16 Feb 2012 22:13:38 +0000 (UTC) Received: (qmail 3452 invoked by uid 500); 16 Feb 2012 22:13:38 -0000 Delivered-To: apmail-incubator-giraph-commits-archive@incubator.apache.org Received: (qmail 3424 invoked by uid 500); 16 Feb 2012 22:13:38 -0000 Mailing-List: contact giraph-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: giraph-dev@incubator.apache.org Delivered-To: mailing list giraph-commits@incubator.apache.org Received: (qmail 3413 invoked by uid 99); 16 Feb 2012 22:13:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2012 22:13:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Feb 2012 22:13:34 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9042E2388C1F; Thu, 16 Feb 2012 22:12:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1245205 [15/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/example... Date: Thu, 16 Feb 2012 22:12:36 -0000 To: giraph-commits@incubator.apache.org From: aching@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120216221244.9042E2388C1F@eris.apache.org> Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java Thu Feb 16 22:12:31 2012 @@ -37,102 +37,115 @@ import com.google.common.collect.Maps; * Helper class for {@link Partition} related operations. */ public class PartitionUtils { - /** Class logger */ - private static Logger LOG = Logger.getLogger(PartitionUtils.class); + /** Class logger */ + private static Logger LOG = Logger.getLogger(PartitionUtils.class); - private static class EdgeCountComparator implements - Comparator> { + /** + * Do not construct this object. + */ + private PartitionUtils() { } + + /** + * Compare edge counts for Entry objects. + */ + private static class EdgeCountComparator implements + Comparator> { + @Override + public int compare(Entry worker1, + Entry worker2) { + return (int) (worker1.getValue().getEdgeCount() - + worker2.getValue().getEdgeCount()); + } + } - @Override - public int compare(Entry worker1, - Entry worker2) { - return (int) (worker1.getValue().getEdgeCount() - - worker2.getValue().getEdgeCount()); - } + /** + * Compare vertex counts between a {@link WorkerInfo} and + * {@link VertexEdgeCount}. + */ + private static class VertexCountComparator implements + Comparator> { + @Override + public int compare(Entry worker1, + Entry worker2) { + return (int) (worker1.getValue().getVertexCount() - + worker2.getValue().getVertexCount()); } + } - private static class VertexCountComparator implements - Comparator> { + /** + * Check for imbalances on a per worker basis, by calculating the + * mean, high and low workers by edges and vertices. + * + * @param partitionOwnerList List of partition owners. + * @param allPartitionStats All the partition stats. + */ + public static void analyzePartitionStats( + Collection partitionOwnerList, + List allPartitionStats) { + Map idOwnerMap = + new HashMap(); + for (PartitionOwner partitionOwner : partitionOwnerList) { + if (idOwnerMap.put(partitionOwner.getPartitionId(), + partitionOwner) != null) { + throw new IllegalStateException( + "analyzePartitionStats: Duplicate partition " + + partitionOwner); + } + } - @Override - public int compare(Entry worker1, - Entry worker2) { - return (int) (worker1.getValue().getEdgeCount() - - worker2.getValue().getEdgeCount()); - } + Map workerStatsMap = Maps.newHashMap(); + VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount(); + for (PartitionStats partitionStats : allPartitionStats) { + WorkerInfo workerInfo = + idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo(); + VertexEdgeCount vertexEdgeCount = + workerStatsMap.get(workerInfo); + if (vertexEdgeCount == null) { + workerStatsMap.put( + workerInfo, + new VertexEdgeCount(partitionStats.getVertexCount(), + partitionStats.getEdgeCount())); + } else { + workerStatsMap.put( + workerInfo, + vertexEdgeCount.incrVertexEdgeCount( + partitionStats.getVertexCount(), + partitionStats.getEdgeCount())); + } + totalVertexEdgeCount = + totalVertexEdgeCount.incrVertexEdgeCount( + partitionStats.getVertexCount(), + partitionStats.getEdgeCount()); } - /** - * Check for imbalances on a per worker basis, by calculating the - * mean, high and low workers by edges and vertices. - */ - public static void analyzePartitionStats( - Collection partitionOwnerList, - List allPartitionStats) { - Map idOwnerMap = - new HashMap(); - for (PartitionOwner partitionOwner : partitionOwnerList) { - if (idOwnerMap.put(partitionOwner.getPartitionId(), - partitionOwner) != null) { - throw new IllegalStateException( - "analyzePartitionStats: Duplicate partition " + - partitionOwner); - } - } - - Map workerStatsMap = Maps.newHashMap(); - VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount(); - for (PartitionStats partitionStats : allPartitionStats) { - WorkerInfo workerInfo = - idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo(); - VertexEdgeCount vertexEdgeCount = - workerStatsMap.get(workerInfo); - if (vertexEdgeCount == null) { - workerStatsMap.put( - workerInfo, - new VertexEdgeCount(partitionStats.getVertexCount(), - partitionStats.getEdgeCount())); - } else { - workerStatsMap.put( - workerInfo, - vertexEdgeCount.incrVertexEdgeCount( - partitionStats.getVertexCount(), - partitionStats.getEdgeCount())); - } - totalVertexEdgeCount = - totalVertexEdgeCount.incrVertexEdgeCount( - partitionStats.getVertexCount(), - partitionStats.getEdgeCount()); - } - - List> workerEntryList = - Lists.newArrayList(workerStatsMap.entrySet()); - - if (LOG.isInfoEnabled()) { - Collections.sort(workerEntryList, new VertexCountComparator()); - LOG.info("analyzePartitionStats: Vertices - Mean: " + - (totalVertexEdgeCount.getVertexCount() / - workerStatsMap.size()) + - ", Min: " + - workerEntryList.get(0).getKey() + " - " + - workerEntryList.get(0).getValue().getVertexCount() + - ", Max: "+ - workerEntryList.get(workerEntryList.size() - 1).getKey() + - " - " + - workerEntryList.get(workerEntryList.size() - 1). - getValue().getVertexCount()); - Collections.sort(workerEntryList, new EdgeCountComparator()); - LOG.info("analyzePartitionStats: Edges - Mean: " + - (totalVertexEdgeCount.getEdgeCount() / - workerStatsMap.size()) + - ", Min: " + - workerEntryList.get(0).getKey() + " - " + - workerEntryList.get(0).getValue().getEdgeCount() + - ", Max: "+ - workerEntryList.get(workerEntryList.size() - 1).getKey() + - " - " + - workerEntryList.get(workerEntryList.size() - 1). - getValue().getEdgeCount()); - } + List> workerEntryList = + Lists.newArrayList(workerStatsMap.entrySet()); + + if (LOG.isInfoEnabled()) { + Collections.sort(workerEntryList, new VertexCountComparator()); + LOG.info("analyzePartitionStats: Vertices - Mean: " + + (totalVertexEdgeCount.getVertexCount() / + workerStatsMap.size()) + + ", Min: " + + workerEntryList.get(0).getKey() + " - " + + workerEntryList.get(0).getValue().getVertexCount() + + ", Max: " + + workerEntryList.get(workerEntryList.size() - 1).getKey() + + " - " + + workerEntryList.get(workerEntryList.size() - 1). + getValue().getVertexCount()); + Collections.sort(workerEntryList, new EdgeCountComparator()); + LOG.info("analyzePartitionStats: Edges - Mean: " + + (totalVertexEdgeCount.getEdgeCount() / + workerStatsMap.size()) + + ", Min: " + + workerEntryList.get(0).getKey() + " - " + + workerEntryList.get(0).getValue().getEdgeCount() + + ", Max: " + + workerEntryList.get(workerEntryList.size() - 1).getKey() + + " - " + + workerEntryList.get(workerEntryList.size() - 1). + getValue().getEdgeCount()); } + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java Thu Feb 16 22:12:31 2012 @@ -34,11 +34,10 @@ import org.apache.hadoop.io.WritableComp */ @SuppressWarnings("rawtypes") public abstract class RangeMasterPartitioner implements - MasterGraphPartitioner { - - @Override - public PartitionStats createPartitionStats() { - return new RangePartitionStats(); - } + V extends Writable, E extends Writable, M extends Writable> implements + MasterGraphPartitioner { + @Override + public PartitionStats createPartitionStats() { + return new RangePartitionStats(); + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java Thu Feb 16 22:12:31 2012 @@ -33,31 +33,43 @@ import org.apache.hadoop.io.WritableComp */ @SuppressWarnings("rawtypes") public class RangePartitionOwner - extends BasicPartitionOwner { - /** Max index for this partition */ - private I maxIndex; + extends BasicPartitionOwner { + /** Max index for this partition */ + private I maxIndex; - public RangePartitionOwner() { - } + /** + * Default constructor. + */ + public RangePartitionOwner() { } - public RangePartitionOwner(I maxIndex) { - this.maxIndex = maxIndex; - } + /** + * Constructor with the max index. + * + * @param maxIndex Max index of this partition. + */ + public RangePartitionOwner(I maxIndex) { + this.maxIndex = maxIndex; + } - public I getMaxIndex() { - return maxIndex; - } + /** + * Get the maximum index of this partition owner. + * + * @return Maximum index. + */ + public I getMaxIndex() { + return maxIndex; + } - @Override - public void readFields(DataInput input) throws IOException { - super.readFields(input); - maxIndex = BspUtils.createVertexIndex(getConf()); - maxIndex.readFields(input); - } + @Override + public void readFields(DataInput input) throws IOException { + super.readFields(input); + maxIndex = BspUtils.createVertexIndex(getConf()); + maxIndex.readFields(input); + } - @Override - public void write(DataOutput output) throws IOException { - super.write(output); - maxIndex.write(output); - } + @Override + public void write(DataOutput output) throws IOException { + super.write(output); + maxIndex.write(output); + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java Thu Feb 16 22:12:31 2012 @@ -32,37 +32,37 @@ import org.apache.hadoop.io.WritableComp */ @SuppressWarnings("rawtypes") public class RangePartitionStats - extends PartitionStats { - /** Can be null if no hint, otherwise a splitting hint */ - private RangeSplitHint hint; + extends PartitionStats { + /** Can be null if no hint, otherwise a splitting hint */ + private RangeSplitHint hint; - /** - * Get the range split hint (if any) - * - * @return Hint of how to split the range if desired, null otherwise - */ - public RangeSplitHint getRangeSplitHint() { - return hint; - } + /** + * Get the range split hint (if any) + * + * @return Hint of how to split the range if desired, null otherwise + */ + public RangeSplitHint getRangeSplitHint() { + return hint; + } - @Override - public void readFields(DataInput input) throws IOException { - super.readFields(input); - boolean hintExists = input.readBoolean(); - if (hintExists) { - hint = new RangeSplitHint(); - hint.readFields(input); - } else { - hint = null; - } + @Override + public void readFields(DataInput input) throws IOException { + super.readFields(input); + boolean hintExists = input.readBoolean(); + if (hintExists) { + hint = new RangeSplitHint(); + hint.readFields(input); + } else { + hint = null; } + } - @Override - public void write(DataOutput output) throws IOException { - super.write(output); - output.writeBoolean(hint != null); - if (hint != null) { - hint.write(output); - } + @Override + public void write(DataOutput output) throws IOException { + super.write(output); + output.writeBoolean(hint != null); + if (hint != null) { + hint.write(output); } + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java Thu Feb 16 22:12:31 2012 @@ -36,38 +36,38 @@ import org.apache.hadoop.io.WritableComp */ @SuppressWarnings("rawtypes") public class RangeSplitHint - implements Writable, Configurable { - /** Hinted split index */ - private I splitIndex; - /** Number of vertices in this range before the split */ - private long preSplitVertexCount; - /** Number of vertices in this range after the split */ - private long postSplitVertexCount; - /** Configuration */ - private Configuration conf; + implements Writable, Configurable { + /** Hinted split index */ + private I splitIndex; + /** Number of vertices in this range before the split */ + private long preSplitVertexCount; + /** Number of vertices in this range after the split */ + private long postSplitVertexCount; + /** Configuration */ + private Configuration conf; - @Override - public void readFields(DataInput input) throws IOException { - splitIndex = BspUtils.createVertexIndex(conf); - splitIndex.readFields(input); - preSplitVertexCount = input.readLong(); - postSplitVertexCount = input.readLong(); - } + @Override + public void readFields(DataInput input) throws IOException { + splitIndex = BspUtils.createVertexIndex(conf); + splitIndex.readFields(input); + preSplitVertexCount = input.readLong(); + postSplitVertexCount = input.readLong(); + } - @Override - public void write(DataOutput output) throws IOException { - splitIndex.write(output); - output.writeLong(preSplitVertexCount); - output.writeLong(postSplitVertexCount); - } + @Override + public void write(DataOutput output) throws IOException { + splitIndex.write(output); + output.writeLong(preSplitVertexCount); + output.writeLong(postSplitVertexCount); + } - @Override - public Configuration getConf() { - return conf; - } + @Override + public Configuration getConf() { + return conf; + } - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java Thu Feb 16 22:12:31 2012 @@ -43,36 +43,36 @@ import org.apache.hadoop.io.WritableComp */ @SuppressWarnings("rawtypes") public abstract class RangeWorkerPartitioner implements - WorkerGraphPartitioner { - /** Mapping of the vertex ids to the {@link PartitionOwner} */ - protected NavigableMap> vertexRangeMap = - new TreeMap>(); + V extends Writable, E extends Writable, M extends Writable> implements + WorkerGraphPartitioner { + /** Mapping of the vertex ids to the {@link PartitionOwner} */ + protected NavigableMap> vertexRangeMap = + new TreeMap>(); - @Override - public PartitionOwner createPartitionOwner() { - return new RangePartitionOwner(); - } + @Override + public PartitionOwner createPartitionOwner() { + return new RangePartitionOwner(); + } - @Override - public PartitionOwner getPartitionOwner(I vertexId) { - // Find the partition owner based on the maximum partition id. - // If the vertex id exceeds any of the maximum partition ids, give - // it to the last one - if (vertexId == null) { - throw new IllegalArgumentException( - "getPartitionOwner: Illegal null vertex id"); - } - I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId); - if (maxVertexIndex == null) { - return vertexRangeMap.lastEntry().getValue(); - } else { - return vertexRangeMap.get(vertexId); - } + @Override + public PartitionOwner getPartitionOwner(I vertexId) { + // Find the partition owner based on the maximum partition id. + // If the vertex id exceeds any of the maximum partition ids, give + // it to the last one + if (vertexId == null) { + throw new IllegalArgumentException( + "getPartitionOwner: Illegal null vertex id"); } - - @Override - public Collection getPartitionOwners() { - return vertexRangeMap.values(); + I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId); + if (maxVertexIndex == null) { + return vertexRangeMap.lastEntry().getValue(); + } else { + return vertexRangeMap.get(vertexId); } + } + + @Override + public Collection getPartitionOwners() { + return vertexRangeMap.values(); + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java Thu Feb 16 22:12:31 2012 @@ -29,62 +29,67 @@ import org.apache.hadoop.io.WritableComp * Stores the {@link PartitionOwner} objects from the master and provides the * mapping of vertex to {@link PartitionOwner}. Also generates the partition * owner implementation. + * + * @param Vertex id + * @param Vertex value + * @param Edge value + * @param Message data */ @SuppressWarnings("rawtypes") public interface WorkerGraphPartitioner { - /** - * Instantiate the {@link PartitionOwner} implementation used to read the - * master assignments. - * - * @return Instantiated {@link PartitionOwner} object - */ - PartitionOwner createPartitionOwner(); + V extends Writable, E extends Writable, M extends Writable> { + /** + * Instantiate the {@link PartitionOwner} implementation used to read the + * master assignments. + * + * @return Instantiated {@link PartitionOwner} object + */ + PartitionOwner createPartitionOwner(); - /** - * Figure out the owner of a vertex - * - * @param vertexId Vertex id to get the partition for - * @return Correct partition owner - */ - PartitionOwner getPartitionOwner(I vertexId); + /** + * Figure out the owner of a vertex + * + * @param vertexId Vertex id to get the partition for + * @return Correct partition owner + */ + PartitionOwner getPartitionOwner(I vertexId); - /** - * At the end of a superstep, workers have {@link PartitionStats} generated - * for each of their partitions. This method will allow the user to - * modify or create their own {@link PartitionStats} interfaces to send to - * the master. - * - * @param workerPartitionStats Stats generated by the infrastructure during - * the superstep - * @param partitionMap Map of all the partitions owned by this worker - * (could be used to provide more useful stat information) - * @return Final partition stats - */ - Collection finalizePartitionStats( - Collection workerPartitionStats, - Map> partitionMap); + /** + * At the end of a superstep, workers have {@link PartitionStats} generated + * for each of their partitions. This method will allow the user to + * modify or create their own {@link PartitionStats} interfaces to send to + * the master. + * + * @param workerPartitionStats Stats generated by the infrastructure during + * the superstep + * @param partitionMap Map of all the partitions owned by this worker + * (could be used to provide more useful stat information) + * @return Final partition stats + */ + Collection finalizePartitionStats( + Collection workerPartitionStats, + Map> partitionMap); - /** - * Get the partitions owners and update locally. Returns the partitions - * to send to other workers and other dependencies. - * - * @param myWorkerInfo Worker info. - * @param masterSetPartitionOwners Master set partition owners, received - * prior to beginning the superstep - * @param partitionMap Map of all the partitions owned by this worker - * (can be used to fill the return map of partitions to send) - * @return Information for the partition exchange. - */ - PartitionExchange updatePartitionOwners( - WorkerInfo myWorkerInfo, - Collection masterSetPartitionOwners, - Map> partitionMap); + /** + * Get the partitions owners and update locally. Returns the partitions + * to send to other workers and other dependencies. + * + * @param myWorkerInfo Worker info. + * @param masterSetPartitionOwners Master set partition owners, received + * prior to beginning the superstep + * @param partitionMap Map of all the partitions owned by this worker + * (can be used to fill the return map of partitions to send) + * @return Information for the partition exchange. + */ + PartitionExchange updatePartitionOwners( + WorkerInfo myWorkerInfo, + Collection masterSetPartitionOwners, + Map> partitionMap); - /** - * Get a collection of the {@link PartitionOwner} objects. - * - * @return Collection of owners for every partition. - */ - Collection getPartitionOwners(); + /** + * Get a collection of the {@link PartitionOwner} objects. + * + * @return Collection of owners for every partition. + */ + Collection getPartitionOwners(); } Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java) URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/package-info.java Thu Feb 16 22:12:31 2012 @@ -15,15 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.giraph.bsp; - /** - * State of the BSP application + * Package of partitioning related objects. */ -public enum ApplicationState { - UNKNOWN, ///< Shouldn't be seen, just an initial state - START_SUPERSTEP, ///< Start from a desired superstep - FAILED, ///< Unrecoverable - FINISHED ///< Successful completion -} +package org.apache.giraph.graph.partition; Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspPolicyProvider.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspPolicyProvider.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspPolicyProvider.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspPolicyProvider.java Thu Feb 16 22:12:31 2012 @@ -23,18 +23,21 @@ import org.apache.hadoop.security.author import org.apache.hadoop.security.authorize.Service; /** - * {@link PolicyProvider} for Map-Reduce protocols. - */ + * {@link PolicyProvider} for Map-Reduce protocols. + */ public class BspPolicyProvider extends PolicyProvider { - private static final Service[] bspCommunicationsServices = - new Service[] { - new Service("security.bsp.communications.protocol.acl", - CommunicationsInterface.class), + /** + * Communication services array. + */ + private static final Service[] BSP_COMMUNICATION_SERVICES = + new Service[] { + new Service("security.bsp.communications.protocol.acl", + CommunicationsInterface.class), }; - @Override - public Service[] getServices() { - return bspCommunicationsServices; - } + @Override + public Service[] getServices() { + return BSP_COMMUNICATION_SERVICES; + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspTokenSelector.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspTokenSelector.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspTokenSelector.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/BspTokenSelector.java Thu Feb 16 22:12:31 2012 @@ -27,25 +27,24 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.security.token.TokenSelector; /** - * Look through tokens to find the first job token that matches the service - * and return it. - */ + * Look through tokens to find the first job token that matches the service + * and return it. + */ public class BspTokenSelector implements TokenSelector { - - @SuppressWarnings("unchecked") - @Override - public Token selectToken(Text service, - Collection> tokens) { - if (service == null) { - return null; - } - Text KIND_NAME = new Text("mapreduce.job"); - for (Token token : tokens) { - if (KIND_NAME.equals(token.getKind())) { - return (Token) token; - } - } - return null; + @SuppressWarnings("unchecked") + @Override + public Token selectToken(Text service, + Collection> tokens) { + if (service == null) { + return null; + } + Text kindName = new Text("mapreduce.job"); + for (Token token : tokens) { + if (kindName.equals(token.getKind())) { + return (Token) token; + } } + return null; + } } Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java) URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/hadoop/package-info.java Thu Feb 16 22:12:31 2012 @@ -15,15 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.giraph.bsp; - /** - * State of the BSP application + * Package of policy and token information for Hadoop. */ -public enum ApplicationState { - UNKNOWN, ///< Shouldn't be seen, just an initial state - START_SUPERSTEP, ///< Start from a desired superstep - FAILED, ///< Unrecoverable - FINISHED ///< Successful completion -} +package org.apache.giraph.hadoop; Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java Thu Feb 16 22:12:31 2012 @@ -38,84 +38,90 @@ import org.apache.log4j.Logger; * Example graph partitioner that builds on {@link HashMasterPartitioner} to * send the partitions to the worker that matches the superstep. It is for * testing only and should never be used in practice. + * + * @param Vertex id + * @param Vertex value + * @param Edge value + * @param Message data */ @SuppressWarnings("rawtypes") -public class SuperstepHashPartitionerFactory< - I extends WritableComparable, - V extends Writable, E extends Writable, M extends Writable> - extends HashPartitionerFactory { +public class SuperstepHashPartitionerFactory + extends HashPartitionerFactory { + /** + * Changes the {@link HashMasterPartitioner} to make ownership of the + * partitions based on a superstep. For testing only as it is totally + * unbalanced. + * + * @param vertex id + * @param vertex data + * @param edge data + * @param message data + */ + private static class SuperstepMasterPartition + extends HashMasterPartitioner { + /** Class logger */ + private static Logger LOG = + Logger.getLogger(SuperstepMasterPartition.class); /** - * Changes the {@link HashMasterPartitioner} to make ownership of the - * partitions based on a superstep. For testing only as it is totally - * unbalanced. + * Construction with configuration. * - * @param vertex id - * @param vertex data - * @param edge data - * @param message data + * @param conf Configuration to be stored. */ - private static class SuperstepMasterPartition< - I extends WritableComparable, - V extends Writable, E extends Writable, M extends Writable> - extends HashMasterPartitioner { - /** Class logger */ - private static Logger LOG = - Logger.getLogger(SuperstepMasterPartition.class); + public SuperstepMasterPartition(Configuration conf) { + super(conf); + } - public SuperstepMasterPartition(Configuration conf) { - super(conf); + @Override + public Collection generateChangedPartitionOwners( + Collection allPartitionStatsList, + Collection availableWorkerInfos, + int maxWorkers, + long superstep) { + // Assign all the partitions to + // superstep mod availableWorkerInfos + // Guaranteed to be different if the workers (and their order) + // do not change + long workerIndex = superstep % availableWorkerInfos.size(); + int i = 0; + WorkerInfo chosenWorkerInfo = null; + for (WorkerInfo workerInfo : availableWorkerInfos) { + if (workerIndex == i) { + chosenWorkerInfo = workerInfo; } + ++i; + } + if (LOG.isInfoEnabled()) { + LOG.info("generateChangedPartitionOwners: Chosen worker " + + "for superstep " + superstep + " is " + + chosenWorkerInfo); + } - @Override - public Collection generateChangedPartitionOwners( - Collection allPartitionStatsList, - Collection availableWorkerInfos, - int maxWorkers, - long superstep) { - // Assign all the partitions to - // superstep mod availableWorkerInfos - // Guaranteed to be different if the workers (and their order) - // do not change - long workerIndex = superstep % availableWorkerInfos.size(); - int i = 0; - WorkerInfo chosenWorkerInfo = null; - for (WorkerInfo workerInfo : availableWorkerInfos) { - if (workerIndex == i) { - chosenWorkerInfo = workerInfo; - } - ++i; - } - if (LOG.isInfoEnabled()) { - LOG.info("generateChangedPartitionOwners: Chosen worker " + - "for superstep " + superstep + " is " + - chosenWorkerInfo); - } - - List partitionOwnerList = - new ArrayList(); - for (PartitionOwner partitionOwner : - getCurrentPartitionOwners()) { - WorkerInfo prevWorkerinfo = - partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ? - null : partitionOwner.getWorkerInfo(); - PartitionOwner tmpPartitionOwner = - new BasicPartitionOwner(partitionOwner.getPartitionId(), - chosenWorkerInfo, - prevWorkerinfo, - null); - partitionOwnerList.add(tmpPartitionOwner); - LOG.info("partition owner was " + partitionOwner + - ", new " + tmpPartitionOwner); - } - setPartitionOwnerList(partitionOwnerList); - return partitionOwnerList; - } + List partitionOwnerList = new ArrayList(); + for (PartitionOwner partitionOwner : + getCurrentPartitionOwners()) { + WorkerInfo prevWorkerinfo = + partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ? + null : partitionOwner.getWorkerInfo(); + PartitionOwner tmpPartitionOwner = + new BasicPartitionOwner(partitionOwner.getPartitionId(), + chosenWorkerInfo, + prevWorkerinfo, + null); + partitionOwnerList.add(tmpPartitionOwner); + LOG.info("partition owner was " + partitionOwner + + ", new " + tmpPartitionOwner); + } + setPartitionOwnerList(partitionOwnerList); + return partitionOwnerList; } + } - @Override - public MasterGraphPartitioner - createMasterGraphPartitioner() { - return new SuperstepMasterPartition(getConf()); - } + @Override + public MasterGraphPartitioner + createMasterGraphPartitioner() { + return new SuperstepMasterPartition(getConf()); + } } Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java) URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/integration/package-info.java Thu Feb 16 22:12:31 2012 @@ -15,15 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.giraph.bsp; - /** - * State of the BSP application + * Package of all helper integration test objects. */ -public enum ApplicationState { - UNKNOWN, ///< Shouldn't be seen, just an initial state - START_SUPERSTEP, ///< Start from a desired superstep - FAILED, ///< Unrecoverable - FINISHED ///< Successful completion -} +package org.apache.giraph.integration; Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java Thu Feb 16 22:12:31 2012 @@ -38,26 +38,41 @@ import java.io.IOException; * @param Edge value */ @SuppressWarnings("rawtypes") -public class AdjacencyListTextVertexOutputFormat extends TextVertexOutputFormat{ - +public class AdjacencyListTextVertexOutputFormat + extends TextVertexOutputFormat { + + /** + * Vertex writer associated wtih {@link AdjacencyListTextVertexOutputFormat}. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + */ static class AdjacencyListVertexWriter extends TextVertexWriter { + /** Split delimiter */ public static final String LINE_TOKENIZE_VALUE = "output.delimiter"; + /** Default split delimiter */ public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t"; - + /** Cached split delimeter */ private String delimiter; - public AdjacencyListVertexWriter(RecordWriter recordWriter) { + /** + * Constructor with writer. + * + * @param recordWriter Record writer used for writing. + */ + public AdjacencyListVertexWriter(RecordWriter recordWriter) { super(recordWriter); } @Override public void writeVertex(BasicVertex vertex) throws IOException, - InterruptedException { + InterruptedException { if (delimiter == null) { delimiter = getContext().getConfiguration() - .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); + .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); } StringBuffer sb = new StringBuffer(vertex.getVertexId().toString()); @@ -75,9 +90,8 @@ public class AdjacencyListTextVertexOutp @Override public VertexWriter createVertexWriter(TaskAttemptContext context) - throws IOException, InterruptedException { + throws IOException, InterruptedException { return new AdjacencyListVertexWriter - (textOutputFormat.getRecordWriter(context)); + (textOutputFormat.getRecordWriter(context)); } - } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java Thu Feb 16 22:12:31 2012 @@ -21,6 +21,7 @@ import com.google.common.collect.Maps; import org.apache.giraph.graph.BasicVertex; import org.apache.giraph.graph.BspUtils; import org.apache.giraph.graph.Edge; +import org.apache.giraph.lib.TextVertexInputFormat.TextVertexReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -42,15 +43,17 @@ import java.util.Map; * @param Vertex index value * @param Vertex value * @param Edge value + * @param Message data */ @SuppressWarnings("rawtypes") public abstract class AdjacencyListVertexReader extends TextVertexInputFormat.TextVertexReader { - + /** Delimiter for split */ public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter"; + /** Default delimiter for split */ public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t"; - + /** Cached delimiter used for split */ private String splitValue = null; /** @@ -59,17 +62,37 @@ public abstract class AdjacencyListVerte public interface LineSanitizer { /** * Clean string s before attempting to tokenize it. + * + * @param s String to be cleaned. + * @return Sanitized string. */ - public String sanitize(String s); + String sanitize(String s); } - private LineSanitizer sanitizer = null; + /** + * Sanitizer from constructor. + */ + private final LineSanitizer sanitizer; - public AdjacencyListVertexReader(RecordReader lineRecordReader) { + /** + * Constructor with line record reader. + * + * @param lineRecordReader Reader from {@link TextVertexReader}. + */ + public AdjacencyListVertexReader( + RecordReader lineRecordReader) { super(lineRecordReader); + sanitizer = null; } - public AdjacencyListVertexReader(RecordReader lineRecordReader, + /** + * Constructor with line record reader. + * + * @param lineRecordReader Reader from {@link TextVertexReader}. + * @param sanitizer Sanitizer to be used. + */ + public AdjacencyListVertexReader( + RecordReader lineRecordReader, LineSanitizer sanitizer) { super(lineRecordReader); this.sanitizer = sanitizer; @@ -77,17 +100,18 @@ public abstract class AdjacencyListVerte /** * Store the Id for this line in an instance of its correct type. + * * @param s Id of vertex from line * @param id Instance of Id's type, in which to store its value */ - abstract public void decodeId(String s, I id); + public abstract void decodeId(String s, I id); /** * Store the value for this line in an instance of its correct type. * @param s Value from line * @param value Instance of value's type, in which to store its value */ - abstract public void decodeValue(String s, V value); + public abstract void decodeValue(String s, V value); /** * Store an edge from the line into an instance of a correctly typed Edge @@ -95,7 +119,7 @@ public abstract class AdjacencyListVerte * @param value The edge's value from the line * @param edge Instance of edge in which to store the id and value */ - abstract public void decodeEdge(String id, String value, Edge edge); + public abstract void decodeEdge(String id, String value, Edge edge); @Override @@ -104,7 +128,8 @@ public abstract class AdjacencyListVerte } @Override - public BasicVertex getCurrentVertex() throws IOException, InterruptedException { + public BasicVertex getCurrentVertex() + throws IOException, InterruptedException { Configuration conf = getContext().getConfiguration(); String line = getRecordReader().getCurrentValue().toString(); BasicVertex vertex = BspUtils.createVertex(conf); @@ -120,7 +145,8 @@ public abstract class AdjacencyListVerte String [] values = line.split(splitValue); if ((values.length < 2) || (values.length % 2 != 0)) { - throw new IllegalArgumentException("Line did not split correctly: " + line); + throw new IllegalArgumentException( + "Line did not split correctly: " + line); } I vertexId = BspUtils.createVertexIndex(conf); @@ -132,7 +158,7 @@ public abstract class AdjacencyListVerte int i = 2; Map edges = Maps.newHashMap(); Edge edge = new Edge(); - while(i < values.length) { + while (i < values.length) { decodeEdge(values[i], values[i + 1], edge); edges.put(edge.getDestVertexId(), edge.getEdgeValue()); i += 2; Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/IdWithValueTextOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/IdWithValueTextOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/IdWithValueTextOutputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/IdWithValueTextOutputFormat.java Thu Feb 16 22:12:31 2012 @@ -40,30 +40,45 @@ import java.io.IOException; * @param Edge value */ @SuppressWarnings("rawtypes") -public class IdWithValueTextOutputFormat extends TextVertexOutputFormat{ - +public class IdWithValueTextOutputFormat + extends TextVertexOutputFormat { + + /** + * Vertex writer used with {@link IdWithValueTextOutputFormat}. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + */ static class IdWithValueVertexWriter extends TextVertexWriter { - + /** Specify the output delimiter */ public static final String LINE_TOKENIZE_VALUE = "output.delimiter"; + /** Default output delimiter */ public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t"; - + /** Reverse id and value order? */ public static final String REVERSE_ID_AND_VALUE = "reverse.id.and.value"; + /** Default is to not reverse id and value order. */ public static final boolean REVERSE_ID_AND_VALUE_DEFAULT = false; - + /** Saved delimiter */ private String delimiter; + /** + * Constructor with record writer. + * + * @param recordWriter Writer from LineRecordWriter. + */ public IdWithValueVertexWriter(RecordWriter recordWriter) { super(recordWriter); } @Override public void writeVertex(BasicVertex vertex) throws IOException, - InterruptedException { + InterruptedException { if (delimiter == null) { delimiter = getContext().getConfiguration() - .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); + .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); } String first; @@ -87,9 +102,8 @@ public class IdWithValueTextOutputFormat @Override public VertexWriter createVertexWriter(TaskAttemptContext context) - throws IOException, InterruptedException { + throws IOException, InterruptedException { return new IdWithValueVertexWriter - (textOutputFormat.getRecordWriter(context)); + (textOutputFormat.getRecordWriter(context)); } - } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexFormat.java Thu Feb 16 22:12:31 2012 @@ -21,11 +21,16 @@ package org.apache.giraph.lib; /** * Keeps the vertex keys for the input/output vertex format */ -public interface JsonBase64VertexFormat { - /** Vertex id key */ - public static final String VERTEX_ID_KEY = "vertexId"; - /** Vertex value key*/ - public static final String VERTEX_VALUE_KEY = "vertexValue"; - /** Edge value array key (all the edges are stored here) */ - public static final String EDGE_ARRAY_KEY = "edgeArray"; +public class JsonBase64VertexFormat { + /** Vertex id key */ + public static final String VERTEX_ID_KEY = "vertexId"; + /** Vertex value key*/ + public static final String VERTEX_VALUE_KEY = "vertexValue"; + /** Edge value array key (all the edges are stored here) */ + public static final String EDGE_ARRAY_KEY = "edgeArray"; + + /** + * Don't construct. + */ + private JsonBase64VertexFormat() { } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java Thu Feb 16 22:12:31 2012 @@ -51,110 +51,110 @@ import java.util.Map; * @param Vertex index value * @param Vertex value * @param Edge value + * @param Message data */ @SuppressWarnings("rawtypes") -public class JsonBase64VertexInputFormat< - I extends WritableComparable, V extends Writable, E extends Writable, - M extends Writable> - extends TextVertexInputFormat implements - JsonBase64VertexFormat { +public class JsonBase64VertexInputFormat + extends TextVertexInputFormat { + /** + * Simple reader that supports {@link JsonBase64VertexInputFormat} + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message data + */ + private static class JsonBase64VertexReader + extends TextVertexReader { /** - * Simple reader that supports {@link JsonBase64VertexInputFormat} + * Only constructor. Requires the LineRecordReader * - * @param Vertex index value - * @param Vertex value - * @param Edge value + * @param lineRecordReader Line record reader to read from */ - private static class JsonBase64VertexReader< - I extends WritableComparable, V extends Writable, - E extends Writable, M extends Writable> extends TextVertexReader { - /** - * Only constructor. Requires the LineRecordReader - * - * @param lineRecordReader Line record reader to read from - */ - public JsonBase64VertexReader(RecordReader lineRecordReader) { - super(lineRecordReader); - } + public JsonBase64VertexReader( + RecordReader lineRecordReader) { + super(lineRecordReader); + } - @Override - public boolean nextVertex() throws IOException, InterruptedException { - return getRecordReader().nextKeyValue(); - } + @Override + public boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } - @Override - public BasicVertex getCurrentVertex() - throws IOException, InterruptedException { - Configuration conf = getContext().getConfiguration(); - BasicVertex vertex = BspUtils.createVertex(conf); + @Override + public BasicVertex getCurrentVertex() + throws IOException, InterruptedException { + Configuration conf = getContext().getConfiguration(); + BasicVertex vertex = BspUtils.createVertex(conf); - Text line = getRecordReader().getCurrentValue(); - JSONObject vertexObject; - try { - vertexObject = new JSONObject(line.toString()); - } catch (JSONException e) { - throw new IllegalArgumentException( - "next: Failed to get the vertex", e); - } - DataInput input = null; - byte[] decodedWritable = null; - I vertexId = null; - try { - decodedWritable = Base64.decode( - vertexObject.getString(VERTEX_ID_KEY)); - input = new DataInputStream( - new ByteArrayInputStream(decodedWritable)); - vertexId = BspUtils.createVertexIndex(conf); - vertexId.readFields(input); - } catch (JSONException e) { - throw new IllegalArgumentException( - "next: Failed to get vertex id", e); - } - V vertexValue = null; - try { - decodedWritable = Base64.decode( - vertexObject.getString(VERTEX_VALUE_KEY)); - input = new DataInputStream( - new ByteArrayInputStream(decodedWritable)); - vertexValue = BspUtils.createVertexValue(conf); - vertexValue.readFields(input); - } catch (JSONException e) { - throw new IllegalArgumentException( - "next: Failed to get vertex value", e); - } - JSONArray edgeArray = null; - try { - edgeArray = vertexObject.getJSONArray(EDGE_ARRAY_KEY); - } catch (JSONException e) { - throw new IllegalArgumentException( - "next: Failed to get edge array", e); - } - Map edgeMap = Maps.newHashMap(); - for (int i = 0; i < edgeArray.length(); ++i) { - try { - decodedWritable = - Base64.decode(edgeArray.getString(i)); - } catch (JSONException e) { - throw new IllegalArgumentException( - "next: Failed to get edge value", e); - } - input = new DataInputStream( - new ByteArrayInputStream(decodedWritable)); - Edge edge = new Edge(); - edge.setConf(getContext().getConfiguration()); - edge.readFields(input); - edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue()); - } - vertex.initialize(vertexId, vertexValue, edgeMap, null); - return vertex; + Text line = getRecordReader().getCurrentValue(); + JSONObject vertexObject; + try { + vertexObject = new JSONObject(line.toString()); + } catch (JSONException e) { + throw new IllegalArgumentException( + "next: Failed to get the vertex", e); + } + DataInput input = null; + byte[] decodedWritable = null; + I vertexId = null; + try { + decodedWritable = Base64.decode( + vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY)); + input = new DataInputStream( + new ByteArrayInputStream(decodedWritable)); + vertexId = BspUtils.createVertexIndex(conf); + vertexId.readFields(input); + } catch (JSONException e) { + throw new IllegalArgumentException( + "next: Failed to get vertex id", e); + } + V vertexValue = null; + try { + decodedWritable = Base64.decode( + vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY)); + input = new DataInputStream( + new ByteArrayInputStream(decodedWritable)); + vertexValue = BspUtils.createVertexValue(conf); + vertexValue.readFields(input); + } catch (JSONException e) { + throw new IllegalArgumentException( + "next: Failed to get vertex value", e); + } + JSONArray edgeArray = null; + try { + edgeArray = vertexObject.getJSONArray( + JsonBase64VertexFormat.EDGE_ARRAY_KEY); + } catch (JSONException e) { + throw new IllegalArgumentException( + "next: Failed to get edge array", e); + } + Map edgeMap = Maps.newHashMap(); + for (int i = 0; i < edgeArray.length(); ++i) { + try { + decodedWritable = Base64.decode(edgeArray.getString(i)); + } catch (JSONException e) { + throw new IllegalArgumentException( + "next: Failed to get edge value", e); } + input = new DataInputStream( + new ByteArrayInputStream(decodedWritable)); + Edge edge = new Edge(); + edge.setConf(getContext().getConfiguration()); + edge.readFields(input); + edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue()); + } + vertex.initialize(vertexId, vertexValue, edgeMap, null); + return vertex; } + } - @Override - public VertexReader createVertexReader( - InputSplit split, - TaskAttemptContext context) throws IOException { - return new JsonBase64VertexReader(textInputFormat.createRecordReader(split, - context)); - } + @Override + public VertexReader createVertexReader( + InputSplit split, TaskAttemptContext context) throws IOException { + return new JsonBase64VertexReader( + textInputFormat.createRecordReader(split, context)); + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexOutputFormat.java Thu Feb 16 22:12:31 2012 @@ -47,80 +47,80 @@ import java.io.IOException; * @param Edge value */ @SuppressWarnings("rawtypes") -public class JsonBase64VertexOutputFormat< - I extends WritableComparable, V extends Writable, E extends Writable> - extends TextVertexOutputFormat - implements JsonBase64VertexFormat { +public class JsonBase64VertexOutputFormat extends + TextVertexOutputFormat { + /** + * Simple writer that supports {@link JsonBase64VertexOutputFormat} + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + */ + private static class JsonBase64VertexWriter extends + TextVertexWriter { /** - * Simple writer that supports {@link JsonBase64VertexOutputFormat} + * Only constructor. Requires the LineRecordWriter * - * @param Vertex index value - * @param Vertex value - * @param Edge value + * @param lineRecordWriter Line record writer to write to */ - private static class JsonBase64VertexWriter< - I extends WritableComparable, V extends Writable, - E extends Writable> extends TextVertexWriter { - /** - * Only constructor. Requires the LineRecordWriter - * - * @param lineRecordWriter Line record writer to write to - */ - public JsonBase64VertexWriter( - RecordWriter lineRecordWriter) { - super(lineRecordWriter); - } - - @Override - public void writeVertex(BasicVertex vertex) - throws IOException, InterruptedException { - ByteArrayOutputStream outputStream = - new ByteArrayOutputStream(); - DataOutput output = new DataOutputStream(outputStream); - JSONObject vertexObject = new JSONObject(); - vertex.getVertexId().write(output); - try { - vertexObject.put( - VERTEX_ID_KEY, - Base64.encodeBytes(outputStream.toByteArray())); - } catch (JSONException e) { - throw new IllegalStateException( - "writerVertex: Failed to insert vertex id", e); - } - outputStream.reset(); - vertex.getVertexValue().write(output); - try { - vertexObject.put( - VERTEX_VALUE_KEY, - Base64.encodeBytes(outputStream.toByteArray())); - } catch (JSONException e) { - throw new IllegalStateException( - "writerVertex: Failed to insert vertex value", e); - } - JSONArray edgeArray = new JSONArray(); - for (I targetVertexId : vertex) { - Edge edge = new Edge( - targetVertexId, vertex.getEdgeValue(targetVertexId)); - edge.setConf(getContext().getConfiguration()); - outputStream.reset(); - edge.write(output); - edgeArray.put(Base64.encodeBytes(outputStream.toByteArray())); - } - try { - vertexObject.put(EDGE_ARRAY_KEY, edgeArray); - } catch (JSONException e) { - throw new IllegalStateException( - "writerVertex: Failed to insert edge array", e); - } - getRecordWriter().write(new Text(vertexObject.toString()), null); - } + public JsonBase64VertexWriter( + RecordWriter lineRecordWriter) { + super(lineRecordWriter); } @Override - public VertexWriter createVertexWriter(TaskAttemptContext context) - throws IOException, InterruptedException { - return new JsonBase64VertexWriter( - textOutputFormat.getRecordWriter(context)); + public void writeVertex(BasicVertex vertex) + throws IOException, InterruptedException { + ByteArrayOutputStream outputStream = + new ByteArrayOutputStream(); + DataOutput output = new DataOutputStream(outputStream); + JSONObject vertexObject = new JSONObject(); + vertex.getVertexId().write(output); + try { + vertexObject.put( + JsonBase64VertexFormat.VERTEX_ID_KEY, + Base64.encodeBytes(outputStream.toByteArray())); + } catch (JSONException e) { + throw new IllegalStateException( + "writerVertex: Failed to insert vertex id", e); + } + outputStream.reset(); + vertex.getVertexValue().write(output); + try { + vertexObject.put( + JsonBase64VertexFormat.VERTEX_VALUE_KEY, + Base64.encodeBytes(outputStream.toByteArray())); + } catch (JSONException e) { + throw new IllegalStateException( + "writerVertex: Failed to insert vertex value", e); + } + JSONArray edgeArray = new JSONArray(); + for (I targetVertexId : vertex) { + Edge edge = new Edge( + targetVertexId, vertex.getEdgeValue(targetVertexId)); + edge.setConf(getContext().getConfiguration()); + outputStream.reset(); + edge.write(output); + edgeArray.put(Base64.encodeBytes(outputStream.toByteArray())); + } + try { + vertexObject.put( + JsonBase64VertexFormat.EDGE_ARRAY_KEY, + edgeArray); + } catch (JSONException e) { + throw new IllegalStateException( + "writerVertex: Failed to insert edge array", e); + } + getRecordWriter().write(new Text(vertexObject.toString()), null); } + } + @Override + public VertexWriter createVertexWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new JsonBase64VertexWriter( + textOutputFormat.getRecordWriter(context)); + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java Thu Feb 16 22:12:31 2012 @@ -35,19 +35,40 @@ import java.io.IOException; * 22 0.1 45 0.3 99 0.44 * to repesent a vertex with id 22, value of 0.1 and edges to nodes 45 and 99, * with values of 0.3 and 0.44, respectively. + * + * @param Message data */ -public class LongDoubleDoubleAdjacencyListVertexInputFormat extends - TextVertexInputFormat { - +public class LongDoubleDoubleAdjacencyListVertexInputFormat + extends TextVertexInputFormat { + + /** + * VertexReader associated with + * {@link LongDoubleDoubleAdjacencyListVertexInputFormat}. + * + * @param Message data. + */ static class VertexReader extends - AdjacencyListVertexReader { + AdjacencyListVertexReader { + /** + * Constructor with Line record reader. + * + * @param lineRecordReader Reader to internally use. + */ VertexReader(RecordReader lineRecordReader) { super(lineRecordReader); } + /** + * Constructor with Line record reader and sanitizer. + * + * @param lineRecordReader Reader to internally use. + * @param sanitizer Line sanitizer. + */ VertexReader(RecordReader lineRecordReader, - LineSanitizer sanitizer) { + LineSanitizer sanitizer) { super(lineRecordReader, sanitizer); } @@ -62,8 +83,10 @@ public class LongDoubleDoubleAdjacencyLi } @Override - public void decodeEdge(String s1, String s2, Edge - textIntWritableEdge) { + public void decodeEdge( + String s1, + String s2, + Edge textIntWritableEdge) { textIntWritableEdge.setDestVertexId(new LongWritable(Long.valueOf(s1))); textIntWritableEdge.setEdgeValue(new DoubleWritable(Double.valueOf(s2))); } @@ -71,10 +94,10 @@ public class LongDoubleDoubleAdjacencyLi @Override public org.apache.giraph.graph.VertexReader createVertexReader( + DoubleWritable, DoubleWritable, M> createVertexReader( InputSplit split, TaskAttemptContext context) throws IOException { return new VertexReader(textInputFormat.createRecordReader( - split, context)); + split, context)); } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java Thu Feb 16 22:12:31 2012 @@ -31,49 +31,73 @@ import org.apache.hadoop.mapreduce.lib.i import java.io.IOException; import java.util.List; +/** + * Sequence file vertex input format based on {@link SequenceFileInputFormat}. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + * @param Value type + */ public class SequenceFileVertexInputFormat, - V extends Writable, - E extends Writable, - M extends Writable, - X extends BasicVertex> + V extends Writable, E extends Writable, M extends Writable, + X extends BasicVertex> extends VertexInputFormat { - protected SequenceFileInputFormat sequenceFileInputFormat - = new SequenceFileInputFormat(); + /** Internal input format */ + protected SequenceFileInputFormat sequenceFileInputFormat = + new SequenceFileInputFormat(); - @Override public List getSplits(JobContext context, int numWorkers) - throws IOException, InterruptedException { + @Override + public List getSplits(JobContext context, int numWorkers) + throws IOException, InterruptedException { return sequenceFileInputFormat.getSplits(context); } @Override public VertexReader createVertexReader(InputSplit split, - TaskAttemptContext context) - throws IOException { + TaskAttemptContext context) throws IOException { return new SequenceFileVertexReader( sequenceFileInputFormat.createRecordReader(split, context)); } + /** + * Vertex reader used with {@link SequenceFileVertexInputFormat}. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + * @param Value type + */ public static class SequenceFileVertexReader, V extends Writable, E extends Writable, M extends Writable, X extends BasicVertex> implements VertexReader { + /** Internal record reader from {@link SequenceFileInputFormat} */ private final RecordReader recordReader; + /** + * Constructor with record reader. + * + * @param recordReader Reader from {@link SequenceFileInputFormat}. + */ public SequenceFileVertexReader(RecordReader recordReader) { this.recordReader = recordReader; } - @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) - throws IOException, InterruptedException { + @Override public void initialize(InputSplit inputSplit, + TaskAttemptContext context) throws IOException, InterruptedException { recordReader.initialize(inputSplit, context); } - @Override public boolean nextVertex() throws IOException, InterruptedException { + @Override public boolean nextVertex() throws IOException, + InterruptedException { return recordReader.nextKeyValue(); } @Override public BasicVertex getCurrentVertex() - throws IOException, InterruptedException { + throws IOException, InterruptedException { return recordReader.getCurrentValue(); } @@ -82,7 +106,8 @@ public class SequenceFileVertexInputForm recordReader.close(); } - @Override public float getProgress() throws IOException, InterruptedException { + @Override public float getProgress() throws IOException, + InterruptedException { return recordReader.getProgress(); } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java Thu Feb 16 22:12:31 2012 @@ -32,19 +32,38 @@ import java.io.IOException; * Class to read graphs stored as adjacency lists with ids represented by * Strings and values as doubles. This is a good inputformat for reading * graphs where the id types do not matter and can be stashed in a String. + * + * @param Message type. */ public class TextDoubleDoubleAdjacencyListVertexInputFormat extends TextVertexInputFormat { - static class VertexReader extends AdjacencyListVertexReader { + /** + * Vertex reader used with + * {@link TextDoubleDoubleAdjacencyListVertexInputFormat} + * + * @param Message type. + */ + static class VertexReader extends + AdjacencyListVertexReader { + /** + * Constructor without sanitzer. + * + * @param lineRecordReader Internal reader. + */ VertexReader(RecordReader lineRecordReader) { super(lineRecordReader); } + /** + * Constructor with {@link LineRecordReader} + * + * @param lineRecordReader Internal reader. + * @param sanitizer Sanitizer of the lines. + */ VertexReader(RecordReader lineRecordReader, - LineSanitizer sanitizer) { + LineSanitizer sanitizer) { super(lineRecordReader, sanitizer); } @@ -59,8 +78,8 @@ public class TextDoubleDoubleAdjacencyLi } @Override - public void decodeEdge(String s1, String s2, Edge - textIntWritableEdge) { + public void decodeEdge(String s1, String s2, + Edge textIntWritableEdge) { textIntWritableEdge.setDestVertexId(new Text(s1)); textIntWritableEdge.setEdgeValue(new DoubleWritable(Double.valueOf(s2))); } @@ -68,11 +87,9 @@ public class TextDoubleDoubleAdjacencyLi @Override public org.apache.giraph.graph.VertexReader createVertexReader( - InputSplit split, + DoubleWritable, M> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException { return new VertexReader(textInputFormat.createRecordReader( - split, context)); + split, context)); } - } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java Thu Feb 16 22:12:31 2012 @@ -43,85 +43,80 @@ import java.util.List; * @param Message value */ @SuppressWarnings("rawtypes") -public abstract class TextVertexInputFormat< - I extends WritableComparable, - V extends Writable, - E extends Writable, - M extends Writable> - extends VertexInputFormat { - /** Uses the TextInputFormat to do everything */ - protected TextInputFormat textInputFormat = new TextInputFormat(); +public abstract class TextVertexInputFormat + extends VertexInputFormat { + /** Uses the TextInputFormat to do everything */ + protected TextInputFormat textInputFormat = new TextInputFormat(); + + /** + * Abstract class to be implemented by the user based on their specific + * vertex input. Easiest to ignore the key value separator and only use + * key instead. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + */ + public abstract static class TextVertexReader + implements VertexReader { + /** Internal line record reader */ + private final RecordReader lineRecordReader; + /** Context passed to initialize */ + private TaskAttemptContext context; /** - * Abstract class to be implemented by the user based on their specific - * vertex input. Easiest to ignore the key value separator and only use - * key instead. + * Initialize with the LineRecordReader. * - * @param Vertex index value - * @param Vertex value - * @param Edge value + * @param lineRecordReader Line record reader from TextInputFormat */ - public static abstract class TextVertexReader - implements VertexReader { - /** Internal line record reader */ - private final RecordReader lineRecordReader; - /** Context passed to initialize */ - private TaskAttemptContext context; - - /** - * Initialize with the LineRecordReader. - * - * @param lineRecordReader Line record reader from TextInputFormat - */ - public TextVertexReader( - RecordReader lineRecordReader) { - this.lineRecordReader = lineRecordReader; - } - - @Override - public void initialize(InputSplit inputSplit, - TaskAttemptContext context) - throws IOException, InterruptedException { - lineRecordReader.initialize(inputSplit, context); - this.context = context; - } - - @Override - public void close() throws IOException { - lineRecordReader.close(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return lineRecordReader.getProgress(); - } - - /** - * Get the line record reader. - * - * @return Record reader to be used for reading. - */ - protected RecordReader getRecordReader() { - return lineRecordReader; - } - - /** - * Get the context. - * - * @return Context passed to initialize. - */ - protected TaskAttemptContext getContext() { - return context; - } + public TextVertexReader( + RecordReader lineRecordReader) { + this.lineRecordReader = lineRecordReader; } @Override - public List getSplits( - JobContext context, int numWorkers) - throws IOException, InterruptedException { - // Ignore the hint of numWorkers here since we are using TextInputFormat - // to do this for us - return textInputFormat.getSplits(context); + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + lineRecordReader.initialize(inputSplit, context); + this.context = context; } + + @Override + public void close() throws IOException { + lineRecordReader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return lineRecordReader.getProgress(); + } + + /** + * Get the line record reader. + * + * @return Record reader to be used for reading. + */ + protected RecordReader getRecordReader() { + return lineRecordReader; + } + + /** + * Get the context. + * + * @return Context passed to initialize. + */ + protected TaskAttemptContext getContext() { + return context; + } + } + + @Override + public List getSplits(JobContext context, int numWorkers) + throws IOException, InterruptedException { + // Ignore the hint of numWorkers here since we are using TextInputFormat + // to do this for us + return textInputFormat.getSplits(context); + } }