Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5D3A4E066 for ; Thu, 7 Mar 2013 05:37:53 +0000 (UTC) Received: (qmail 47818 invoked by uid 500); 7 Mar 2013 05:37:53 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 47668 invoked by uid 500); 7 Mar 2013 05:37:52 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 47254 invoked by uid 99); 7 Mar 2013 05:37:42 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Mar 2013 05:37:42 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1825B831161; Thu, 7 Mar 2013 05:37:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: apresta@apache.org To: commits@giraph.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [6/8] GIRAPH-528: Decouple vertex implementation from edge storage (apresta) Message-Id: <20130307053742.1825B831161@tyr.zones.apache.org> Date: Thu, 7 Mar 2013 05:37:41 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java new file mode 100644 index 0000000..36381a7 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.edge; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Interface for {@link VertexEdges} implementations that provide efficient + * random access to the edges given the target vertex id. + * This version is for strict graphs (i.e. assumes no parallel edges). + * + * @param Vertex id + * @param Edge value + */ +public interface StrictRandomAccessVertexEdges extends VertexEdges { + /** + * Return the edge value for the given target vertex id (or null if there + * is no edge pointing to it). + * + * @param targetVertexId Target vertex id + * @return Edge value + */ + E getEdgeValue(I targetVertexId); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/VertexEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/VertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/VertexEdges.java new file mode 100644 index 0000000..bb885b7 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/VertexEdges.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.edge; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Interface for data structures that store out-edges for a vertex. + * + * @param Vertex id + * @param Edge value + */ +public interface VertexEdges + extends Iterable>, Writable { + /** + * Initialize the data structure and set the edges from an iterable. + * This method (or one of the two alternatives) must be called + * after instantiation, unless readFields() is called. + * Note: whether parallel edges are allowed or not depends on the + * implementation. + * + * @param edges Iterable of edges + */ + void initialize(Iterable> edges); + + /** + * Initialize the data structure with the specified initial capacity. + * This method (or one of the two alternatives) must be called + * after instantiation, unless readFields() is called. + * + * @param capacity Initial capacity + */ + void initialize(int capacity); + + /** + * Initialize the data structure with the default initial capacity. + * This method (or one of the two alternatives) must be called + * after instantiation, unless readFields() is called. + * + */ + void initialize(); + + /** + * Add an edge. + * Note: whether parallel edges are allowed or not depends on the + * implementation. + * + * @param edge Edge to add + */ + void add(Edge edge); + + /** + * Remove all edges to the given target vertex. + * Note: the implementation will vary depending on whether parallel edges + * are allowed or not. + * + * @param targetVertexId Target vertex id + */ + void remove(I targetVertexId); + + /** + * Return the number of edges. + * + * @return Number of edges + */ + int size(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/package-info.java b/giraph-core/src/main/java/org/apache/giraph/edge/package-info.java new file mode 100644 index 0000000..2281509 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of Vertex implementations. + */ +package org.apache.giraph.edge; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index c7aff7c..439ee5b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -34,7 +34,6 @@ import org.apache.giraph.time.Time; import org.apache.giraph.time.Times; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.TimedLogger; -import org.apache.giraph.vertex.Vertex; import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerThreadAggregatorUsage; import org.apache.hadoop.io.Writable; @@ -212,8 +211,7 @@ public class ComputeCallable messages = - messageStore.getVertexMessages(vertex.getId()); + Iterable messages = messageStore.getVertexMessages(vertex.getId()); if (vertex.isHalted() && !Iterables.isEmpty(messages)) { vertex.wakeUp(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/DefaultEdge.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultEdge.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultEdge.java deleted file mode 100644 index 039f0d7..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultEdge.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.graph; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.google.common.base.Objects; - -/** - * A complete edge, the target vertex and the edge value. Can only be one - * edge with a destination vertex id per edge map. - * - * @param Vertex index - * @param Edge value - */ -@SuppressWarnings("rawtypes") -public class DefaultEdge - implements MutableEdge { - /** Target vertex id */ - private I targetVertexId = null; - /** Edge value */ - private E value = null; - - /** - * Constructor for reflection - */ - public DefaultEdge() { } - - /** - * Create the edge with final values. Don't call, use EdgeFactory instead. - * - * @param targetVertexId Desination vertex id. - * @param value Value of the edge. - */ - DefaultEdge(I targetVertexId, E value) { - this.targetVertexId = targetVertexId; - this.value = value; - } - - @Override - public I getTargetVertexId() { - return targetVertexId; - } - - @Override - public E getValue() { - return value; - } - - @Override - public void setTargetVertexId(I targetVertexId) { - this.targetVertexId = targetVertexId; - } - - @Override - public void setValue(E value) { - this.value = value; - } - - @Override - public String toString() { - return "(TargetVertexId = " + targetVertexId + ", " + - "value = " + value + ")"; - } - - @SuppressWarnings("unchecked") - @Override - public int compareTo(Edge edge) { - return targetVertexId.compareTo(edge.getTargetVertexId()); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - DefaultEdge edge = (DefaultEdge) o; - return Objects.equal(targetVertexId, edge.targetVertexId) && - Objects.equal(value, edge.value); - } - - @Override - public int hashCode() { - return Objects.hashCode(targetVertexId, value); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java index c88b2b9..52df38d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java @@ -20,8 +20,7 @@ package org.apache.giraph.graph; import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.vertex.MutableVertex; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.edge.Edge; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.log4j.Logger; @@ -87,9 +86,8 @@ public class DefaultVertexResolver mv = (MutableVertex) vertex; for (I removedDestVertex : vertexChanges.getRemovedEdgeList()) { - mv.removeEdges(removedDestVertex); + vertex.removeEdges(removedDestVertex); } } } @@ -156,9 +154,8 @@ public class DefaultVertexResolver mv = (MutableVertex) vertex; for (Edge edge : vertexChanges.getAddedEdgeList()) { - mv.addEdge(edge); + vertex.addEdge(edge); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/Edge.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Edge.java b/giraph-core/src/main/java/org/apache/giraph/graph/Edge.java deleted file mode 100644 index 185e3c3..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/graph/Edge.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.graph; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * A complete edge, the target vertex and the edge value. Can only be one - * edge with a destination vertex id per edge map. - * - * @param Vertex index - * @param Edge value - */ -public interface Edge - extends Comparable> { - /** - * Get the target vertex index of this edge - * - * @return Target vertex index of this edge - */ - I getTargetVertexId(); - - /** - * Get the edge value of the edge - * - * @return Edge value of this edge - */ - E getValue(); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/EdgeFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeFactory.java deleted file mode 100644 index a3e6efb..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeFactory.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.graph; - -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Factory for creating Edges - */ -public class EdgeFactory { - /** Do not construct */ - private EdgeFactory() { } - - /** - * Create an edge pointing to a given ID with a value - * - * @param id target ID - * @param value edge value - * @param Vertex ID type - * @param Edge Value type - * @return Edge pointing to ID with value - */ - public static - Edge create(I id, E value) { - return createMutable(id, value); - } - - /** - * Create an edge pointing to a given ID without a value - * - * @param id target ID - * @param Vertex ID type - * @return Edge pointing to ID without a value - */ - public static - Edge create(I id) { - return createMutable(id); - } - - /** - * Create a mutable edge pointing to a given ID with a value - * - * @param id target ID - * @param value edge value - * @param Vertex ID type - * @param Edge Value type - * @return Edge pointing to ID with value - */ - public static - MutableEdge createMutable(I id, E value) { - if (value instanceof NullWritable) { - return (MutableEdge) createMutable(id); - } else { - return new DefaultEdge(id, value); - } - } - - /** - * Create a mutable edge pointing to a given ID with a value - * - * @param id target ID - * @param Vertex ID type - * @return Edge pointing to ID with value - */ - public static - MutableEdge createMutable(I id) { - return new EdgeNoValue(id); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/EdgeNoValue.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeNoValue.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeNoValue.java deleted file mode 100644 index 4ac6759..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeNoValue.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.graph; - -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.WritableComparable; - -import com.google.common.base.Objects; - -/** - * An edge that has no value. - * - * @param Vertex ID - */ -public class EdgeNoValue - implements MutableEdge { - /** Target vertex id */ - private I targetVertexId = null; - - /** Empty constructor */ - EdgeNoValue() { } - - /** - * Constructor with target vertex ID. Don't call, use EdgeFactory instead. - * - * @param targetVertexId vertex ID - */ - EdgeNoValue(I targetVertexId) { - this.targetVertexId = targetVertexId; - } - - @Override - public void setTargetVertexId(I targetVertexId) { - this.targetVertexId = targetVertexId; - } - - @Override - public void setValue(NullWritable value) { - // do nothing - } - - @Override - public I getTargetVertexId() { - return targetVertexId; - } - - @Override - public NullWritable getValue() { - return NullWritable.get(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - EdgeNoValue edge = (EdgeNoValue) o; - return Objects.equal(targetVertexId, edge.targetVertexId); - } - - @Override - public int hashCode() { - return Objects.hashCode(targetVertexId); - } - - @Override - public int compareTo(Edge o) { - return targetVertexId.compareTo(o.getTargetVertexId()); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java deleted file mode 100644 index 6210367..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.graph; - -import com.google.common.collect.MapMaker; -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.partition.Partition; -import org.apache.giraph.utils.ByteArrayEdges; -import org.apache.giraph.utils.ByteArrayVertexIdEdges; -import org.apache.giraph.vertex.Vertex; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.util.Progressable; -import org.apache.log4j.Logger; - -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - -/** - * Collects incoming edges for vertices owned by this worker. - * Note: the current implementation is simply a bridge between - * incoming requests and vertices. In the future, EdgeStore may become an - * interface allowing for alternative, pluggable implementations of edge - * storage without having to extend Vertex. - * - * @param Vertex id - * @param Vertex value - * @param Edge value - * @param Message data - */ -public class EdgeStore { - /** Class logger */ - private static final Logger LOG = Logger.getLogger(EdgeStore.class); - /** Service worker. */ - private CentralizedServiceWorker service; - /** Giraph configuration. */ - private ImmutableClassesGiraphConfiguration configuration; - /** Progressable to report progress. */ - private Progressable progressable; - /** Map used to temporarily store incoming edges. */ - private ConcurrentMap>> transientEdges; - /** - * Whether we should reuse edge objects (cached to avoid expensive calls - * to the configuration). - */ - private boolean reuseIncomingEdgeObjects; - - /** - * Constructor. - * - * @param service Service worker - * @param configuration Configuration - * @param progressable Progressable - */ - public EdgeStore( - CentralizedServiceWorker service, - ImmutableClassesGiraphConfiguration configuration, - Progressable progressable) { - this.service = service; - this.configuration = configuration; - this.progressable = progressable; - reuseIncomingEdgeObjects = configuration.reuseIncomingEdgeObjects(); - transientEdges = new MapMaker().concurrencyLevel( - configuration.getNettyServerExecutionConcurrency()).makeMap(); - } - - /** - * Add edges belonging to a given partition on this worker. - * Note: This method is thread-safe. - * - * @param partitionId Partition id for the incoming edges. - * @param edges Incoming edges - */ - public void addPartitionEdges( - int partitionId, ByteArrayVertexIdEdges edges) { - ConcurrentMap> partitionEdges = - transientEdges.get(partitionId); - if (partitionEdges == null) { - ConcurrentMap> newPartitionEdges = - new MapMaker().concurrencyLevel( - configuration.getNettyServerExecutionConcurrency()).makeMap(); - partitionEdges = transientEdges.putIfAbsent(partitionId, - newPartitionEdges); - if (partitionEdges == null) { - partitionEdges = newPartitionEdges; - } - } - ByteArrayVertexIdEdges.VertexIdEdgeIterator vertexIdEdgeIterator = - edges.getVertexIdEdgeIterator(); - while (vertexIdEdgeIterator.hasNext()) { - vertexIdEdgeIterator.next(); - I vertexId = vertexIdEdgeIterator.getCurrentVertexId(); - Edge edge = vertexIdEdgeIterator.getCurrentEdge(); - ByteArrayEdges vertexEdges = partitionEdges.get(vertexId); - if (vertexEdges == null) { - ByteArrayEdges newVertexEdges = - new ByteArrayEdges(configuration); - vertexEdges = partitionEdges.putIfAbsent(vertexId, newVertexEdges); - if (vertexEdges == null) { - vertexEdges = newVertexEdges; - // Since we had to use the vertex id as a new key in the map, - // we need to release the object. - vertexIdEdgeIterator.releaseCurrentVertexId(); - } - } - synchronized (vertexEdges) { - vertexEdges.appendEdge(edge); - } - } - } - - /** - * Move all edges from temporary storage to their source vertices. - * Note: this method is not thread-safe. - */ - public void moveEdgesToVertices() { - if (LOG.isInfoEnabled()) { - LOG.info("moveEdgesToVertices: Moving incoming edges to vertices."); - } - for (Map.Entry>> partitionEdges : transientEdges.entrySet()) { - Partition partition = - service.getPartitionStore().getPartition(partitionEdges.getKey()); - for (I vertexId : partitionEdges.getValue().keySet()) { - // Depending on whether the vertex implementation keeps references to - // the Edge objects or not, we may be able to reuse objects when - // iterating. - Iterable> edgesIterable = reuseIncomingEdgeObjects ? - partitionEdges.getValue().remove(vertexId) : - partitionEdges.getValue().remove(vertexId).copyEdgeIterable(); - Vertex vertex = partition.getVertex(vertexId); - // If the source vertex doesn't exist, create it. Otherwise, - // just set the edges. - if (vertex == null) { - vertex = configuration.createVertex(); - vertex.initialize(vertexId, configuration.createVertexValue(), - edgesIterable); - partition.putVertex(vertex); - } else { - vertex.setEdges(edgesIterable); - // Some Partition implementations (e.g. ByteArrayPartition) require - // us to put back the vertex after modifying it. - partition.saveVertex(vertex); - } - progressable.progress(); - } - // Some PartitionStore implementations (e.g. DiskBackedPartitionStore) - // require us to put back the partition after modifying it. - service.getPartitionStore().putPartition(partition); - } - if (LOG.isInfoEnabled()) { - LOG.info("moveEdgesToVertices: Finished moving incoming edges to " + - "vertices."); - } - transientEdges.clear(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java index 726c21e..3c2286d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java @@ -17,7 +17,6 @@ */ package org.apache.giraph.graph; -import org.apache.giraph.vertex.Vertex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 20fa5c5..e74c59a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -18,8 +18,9 @@ package org.apache.giraph.graph; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.giraph.bsp.BspService; -import org.apache.giraph.bsp.BspUtils; import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.messages.MessageStoreByPartition; @@ -41,7 +42,6 @@ import org.apache.giraph.time.Time; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.ProgressableUtils; import org.apache.giraph.utils.ReflectionUtils; -import org.apache.giraph.vertex.Vertex; import org.apache.giraph.worker.BspServiceWorker; import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.giraph.worker.WorkerContext; @@ -57,9 +57,6 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import java.io.IOException; import java.lang.reflect.Type; import java.net.URL; @@ -433,8 +430,10 @@ public class GraphTaskManager> vertexClass = - BspUtils.getVertexClass(conf); + giraphConf.getVertexClass(); List> classList = ReflectionUtils.getTypeArguments( Vertex.class, vertexClass); Type vertexIndexType = classList.get(0); http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/MutableEdge.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MutableEdge.java b/giraph-core/src/main/java/org/apache/giraph/graph/MutableEdge.java deleted file mode 100644 index 52e4c47..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/graph/MutableEdge.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.graph; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * A complete edge, the target vertex and the edge value. Can only be one - * edge with a destination vertex id per edge map. This edge can be mutated, - * that is you can set it's target vertex ID and edge value. - * - * @param Vertex index - * @param Edge value - */ -public interface MutableEdge - extends Edge { - /** - * Set the destination vertex index of this edge. - * - * @param targetVertexId new destination vertex - */ - void setTargetVertexId(I targetVertexId); - - /** - * Set the value for this edge. - * - * @param value new edge value - */ - void setValue(E value); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/ReverseEdgeDuplicator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ReverseEdgeDuplicator.java b/giraph-core/src/main/java/org/apache/giraph/graph/ReverseEdgeDuplicator.java deleted file mode 100644 index 4415cc2..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ReverseEdgeDuplicator.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.graph; - -import org.apache.giraph.io.EdgeReader; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; - -/** - * An EdgeReader that creates the opposite direction edge for each edge read. - * Used to create an undirected graph from a directed input. - * This class is a decorator around any other EdgeReader. - * - * @param Vertex ID - * @param Edge Value - */ -public class ReverseEdgeDuplicator implements EdgeReader { - /** The underlying EdgeReader to wrap */ - private final EdgeReader baseReader; - - /** Whether the reverse edge stored currently is valid */ - private boolean haveReverseEdge = true; - /** Reverse of the edge last read */ - private Edge reverseEdge; - /** Reverse source of last edge, in other words last edge's target */ - private I reverseSourceId; - - /** - * Constructor - * @param baseReader EdgeReader to wrap - */ - public ReverseEdgeDuplicator(EdgeReader baseReader) { - this.baseReader = baseReader; - } - - /** - * Get wrapped EdgeReader - * @return EdgeReader - */ - public EdgeReader getBaseReader() { - return baseReader; - } - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext context) - throws IOException, InterruptedException { - baseReader.initialize(inputSplit, context); - haveReverseEdge = true; - } - - @Override - public boolean nextEdge() throws IOException, InterruptedException { - boolean result = true; - if (haveReverseEdge) { - result = baseReader.nextEdge(); - haveReverseEdge = false; - } else { - Edge currentEdge = baseReader.getCurrentEdge(); - reverseSourceId = currentEdge.getTargetVertexId(); - reverseEdge = EdgeFactory.create(baseReader.getCurrentSourceId(), - currentEdge.getValue()); - haveReverseEdge = true; - } - return result; - } - - @Override - public I getCurrentSourceId() throws IOException, InterruptedException { - if (haveReverseEdge) { - return reverseSourceId; - } else { - return baseReader.getCurrentSourceId(); - } - } - - @Override - public Edge getCurrentEdge() throws IOException, InterruptedException { - if (haveReverseEdge) { - return reverseEdge; - } else { - return baseReader.getCurrentEdge(); - } - } - - @Override - public void close() throws IOException { - baseReader.close(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return baseReader.getProgress(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java new file mode 100644 index 0000000..c8abab2 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +import com.google.common.collect.UnmodifiableIterator; +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.MultiRandomAccessVertexEdges; +import org.apache.giraph.edge.StrictRandomAccessVertexEdges; +import org.apache.giraph.edge.VertexEdges; +import org.apache.giraph.partition.PartitionContext; +import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.giraph.worker.WorkerContext; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; + +/** + * Basic abstract class for writing a BSP application for computation. + * + * @param Vertex id + * @param Vertex data + * @param Edge data + * @param Message data + */ +public abstract class Vertex + implements WorkerAggregatorUsage, Writable, + ImmutableClassesGiraphConfigurable { + /** Vertex id. */ + private I id; + /** Vertex value. */ + private V value; + /** Outgoing edges. */ + private VertexEdges edges; + /** If true, do not do anymore computation on this vertex. */ + private boolean halt; + /** Global graph state **/ + private GraphState graphState; + /** Configuration */ + private ImmutableClassesGiraphConfiguration conf; + + /** + * Initialize id, value, and edges. + * This method (or the alternative form initialize(id, value)) must be called + * after instantiation, unless readFields() is called. + * + * @param id Vertex id + * @param value Vertex value + * @param edges Iterable of edges + */ + public void initialize(I id, V value, Iterable> edges) { + this.id = id; + this.value = value; + setEdges(edges); + } + + /** + * Initialize id and value. Vertex edges will be empty. + * This method (or the alternative form initialize(id, value, edges)) + * must be called after instantiation, unless readFields() is called. + * + * @param id Vertex id + * @param value Vertex value + */ + public void initialize(I id, V value) { + this.id = id; + this.value = value; + this.edges = conf.createAndInitializeVertexEdges(0); + } + + /** + * Set the outgoing edges for this vertex. + * + * @param edges Iterable of edges + */ + public void setEdges(Iterable> edges) { + // If the iterable is actually an instance of VertexEdges, + // we simply take the reference. + // Otherwise, we initialize a new VertexEdges. + if (edges instanceof VertexEdges) { + this.edges = (VertexEdges) edges; + } else { + this.edges = conf.createAndInitializeVertexEdges(edges); + } + } + + /** + * Must be defined by user to do computation on a single Vertex. + * + * @param messages Messages that were sent to this vertex in the previous + * superstep. Each message is only guaranteed to have + * a life expectancy as long as next() is not called. + * @throws IOException + */ + public abstract void compute(Iterable messages) throws IOException; + + /** + * Retrieves the current superstep. + * + * @return Current superstep + */ + public long getSuperstep() { + return graphState.getSuperstep(); + } + + /** + * Get the vertex id. + * + * @return My vertex id. + */ + public I getId() { + return id; + } + + /** + * Get the vertex value (data stored with vertex) + * + * @return Vertex value + */ + public V getValue() { + return value; + } + + /** + * Set the vertex data (immediately visible in the computation) + * + * @param value Vertex data to be set + */ + public void setValue(V value) { + this.value = value; + } + + /** + * Get the total (all workers) number of vertices that + * existed in the previous superstep. + * + * @return Total number of vertices (-1 if first superstep) + */ + public long getTotalNumVertices() { + return graphState.getTotalNumVertices(); + } + + /** + * Get the total (all workers) number of edges that + * existed in the previous superstep. + * + * @return Total number of edges (-1 if first superstep) + */ + public long getTotalNumEdges() { + return graphState.getTotalNumEdges(); + } + + /** + * Get a read-only view of the out-edges of this vertex. + * Note: edge objects returned by this iterable may be invalidated as soon + * as the next element is requested. Thus, keeping a reference to an edge + * almost always leads to undesired behavior. + * + * @return the out edges (sort order determined by subclass implementation). + */ + public Iterable> getEdges() { + return edges; + } + + /** + * Get the number of outgoing edges on this vertex. + * + * @return the total number of outbound edges from this vertex + */ + public int getNumEdges() { + return edges.size(); + } + + /** + * Return the value of the first edge with the given target vertex id, + * or null if there is no such edge. + * Note: edge value objects returned by this method may be invalidated by + * the next call. Thus, keeping a reference to an edge value almost always + * leads to undesired behavior. + * + * @param targetVertexId Target vertex id + * @return Edge value (or null if missing) + */ + public E getEdgeValue(I targetVertexId) { + if (edges instanceof StrictRandomAccessVertexEdges) { + return ((StrictRandomAccessVertexEdges) edges) + .getEdgeValue(targetVertexId); + } else { + for (Edge edge : edges) { + if (edge.getTargetVertexId().equals(targetVertexId)) { + return edge.getValue(); + } + } + return null; + } + } + + /** + * Get an iterable over the values of all edges with the given target + * vertex id. This only makes sense for multigraphs (i.e. graphs with + * parallel edges). + * Note: edge value objects returned by this method may be invalidated as + * soon as the next element is requested. Thus, keeping a reference to an + * edge value almost always leads to undesired behavior. + * + * @param targetVertexId Target vertex id + * @return Iterable of edge values + */ + public Iterable getAllEdgeValues(final I targetVertexId) { + if (edges instanceof MultiRandomAccessVertexEdges) { + return ((MultiRandomAccessVertexEdges) edges) + .getAllEdgeValues(targetVertexId); + } else { + return new Iterable() { + @Override + public Iterator iterator() { + return new UnmodifiableIterator() { + /** Iterator over all edges. */ + private Iterator> edgeIterator = edges.iterator(); + /** Last matching edge found. */ + private Edge currentEdge; + + @Override + public boolean hasNext() { + while (edgeIterator.hasNext()) { + currentEdge = edgeIterator.next(); + if (currentEdge.getTargetVertexId().equals(targetVertexId)) { + return true; + } + } + return false; + } + + @Override + public E next() { + return currentEdge.getValue(); + } + }; + } + }; + } + } + + /** + * Send a message to a vertex id. The message should not be mutated after + * this method returns or else undefined results could occur. + * + * @param id Vertex id to send the message to + * @param message Message data to send. Note that after the message is sent, + * the user should not modify the object. + */ + public void sendMessage(I id, M message) { + if (graphState.getWorkerClientRequestProcessor(). + sendMessageRequest(id, message)) { + graphState.getGraphTaskManager().notifySentMessages(); + } + } + + /** + * Send a message to all edges. + * + * @param message Message sent to all edges. + */ + public void sendMessageToAllEdges(M message) { + for (Edge edge : getEdges()) { + sendMessage(edge.getTargetVertexId(), message); + } + } + + /** + * After this is called, the compute() code will no longer be called for + * this vertex unless a message is sent to it. Then the compute() code + * will be called once again until this function is called. The + * application finishes only when all vertices vote to halt. + */ + public void voteToHalt() { + halt = true; + } + + /** + * Re-activate vertex if halted. + */ + public void wakeUp() { + halt = false; + } + + /** + * Is this vertex done? + * + * @return True if halted, false otherwise. + */ + public boolean isHalted() { + return halt; + } + + /** + * Add an edge for this vertex (happens immediately) + * + * @param edge Edge to add + */ + public void addEdge(Edge edge) { + edges.add(edge); + } + + /** + * Removes all edges pointing to the given vertex id. + * + * @param targetVertexId the target vertex id + */ + public void removeEdges(I targetVertexId) { + edges.remove(targetVertexId); + } + + /** + * Sends a request to create a vertex that will be available during the + * next superstep. + * + * @param id Vertex id + * @param value Vertex value + * @param edges Initial edges + */ + public void addVertexRequest(I id, V value, VertexEdges edges) + throws IOException { + Vertex vertex = conf.createVertex(); + vertex.initialize(id, value, edges); + graphState.getWorkerClientRequestProcessor().addVertexRequest(vertex); + } + + /** + * Sends a request to create a vertex that will be available during the + * next superstep. + * + * @param id Vertex id + * @param value Vertex value + */ + public void addVertexRequest(I id, V value) throws IOException { + addVertexRequest(id, value, conf.createVertexEdges()); + } + + /** + * Request to remove a vertex from the graph + * (applied just prior to the next superstep). + * + * @param vertexId Id of the vertex to be removed. + */ + public void removeVertexRequest(I vertexId) throws IOException { + graphState.getWorkerClientRequestProcessor(). + removeVertexRequest(vertexId); + } + + /** + * Request to add an edge of a vertex in the graph + * (processed just prior to the next superstep) + * + * @param sourceVertexId Source vertex id of edge + * @param edge Edge to add + */ + public void addEdgeRequest(I sourceVertexId, Edge edge) + throws IOException { + graphState.getWorkerClientRequestProcessor(). + addEdgeRequest(sourceVertexId, edge); + } + + /** + * Request to remove all edges from a given source vertex to a given target + * vertex (processed just prior to the next superstep). + * + * @param sourceVertexId Source vertex id + * @param targetVertexId Target vertex id + */ + public void removeEdgesRequest(I sourceVertexId, I targetVertexId) + throws IOException { + graphState.getWorkerClientRequestProcessor(). + removeEdgesRequest(sourceVertexId, targetVertexId); + } + + /** + * Set the graph state for all workers + * + * @param graphState Graph state for all workers + */ + public void setGraphState(GraphState graphState) { + this.graphState = graphState; + } + + /** + * Get the mapper context + * + * @return Mapper context + */ + public Mapper.Context getContext() { + return graphState.getContext(); + } + + /** + * Get the partition context + * + * @return Partition context + */ + public PartitionContext getPartitionContext() { + return graphState.getPartitionContext(); + } + + /** + * Get the worker context + * + * @return WorkerContext context + */ + public WorkerContext getWorkerContext() { + return graphState.getGraphTaskManager().getWorkerContext(); + } + + @Override + public void aggregate(String name, A value) { + graphState.getWorkerAggregatorUsage().aggregate(name, value); + } + + @Override + public A getAggregatedValue(String name) { + return graphState.getWorkerAggregatorUsage().getAggregatedValue(name); + } + + @Override + public void readFields(DataInput in) throws IOException { + id = conf.createVertexId(); + id.readFields(in); + value = conf.createVertexValue(); + value.readFields(in); + edges = conf.createVertexEdges(); + edges.readFields(in); + halt = in.readBoolean(); + } + + @Override + public void write(DataOutput out) throws IOException { + id.write(out); + value.write(out); + edges.write(out); + out.writeBoolean(halt); + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return conf; + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration conf) { + this.conf = conf; + } + + @Override + public String toString() { + return "Vertex(id=" + getId() + ",value=" + getValue() + + ",#edges=" + getNumEdges() + ")"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java index ef61dbb..9474636 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java @@ -18,7 +18,7 @@ package org.apache.giraph.graph; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.edge.Edge; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java index fa33341..ea50f25 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java @@ -20,8 +20,8 @@ package org.apache.giraph.graph; import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; import org.apache.giraph.utils.WritableUtils; -import org.apache.giraph.vertex.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.json.JSONException; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java index 4a36706..1fc0ddc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java @@ -18,7 +18,6 @@ package org.apache.giraph.graph; -import org.apache.giraph.vertex.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java index ed6fad1..f6dccdc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java @@ -18,7 +18,7 @@ package org.apache.giraph.io; -import org.apache.giraph.graph.Edge; +import org.apache.giraph.edge.Edge; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/ReverseEdgeDuplicator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/ReverseEdgeDuplicator.java b/giraph-core/src/main/java/org/apache/giraph/io/ReverseEdgeDuplicator.java new file mode 100644 index 0000000..e85931f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/ReverseEdgeDuplicator.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * An EdgeReader that creates the opposite direction edge for each edge read. + * Used to create an undirected graph from a directed input. + * This class is a decorator around any other EdgeReader. + * + * @param Vertex ID + * @param Edge Value + */ +public class ReverseEdgeDuplicator implements EdgeReader { + /** The underlying EdgeReader to wrap */ + private final EdgeReader baseReader; + + /** Whether the reverse edge stored currently is valid */ + private boolean haveReverseEdge = true; + /** Reverse of the edge last read */ + private Edge reverseEdge; + /** Reverse source of last edge, in other words last edge's target */ + private I reverseSourceId; + + /** + * Constructor + * @param baseReader EdgeReader to wrap + */ + public ReverseEdgeDuplicator(EdgeReader baseReader) { + this.baseReader = baseReader; + } + + /** + * Get wrapped EdgeReader + * @return EdgeReader + */ + public EdgeReader getBaseReader() { + return baseReader; + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + baseReader.initialize(inputSplit, context); + haveReverseEdge = true; + } + + @Override + public boolean nextEdge() throws IOException, InterruptedException { + boolean result = true; + if (haveReverseEdge) { + result = baseReader.nextEdge(); + haveReverseEdge = false; + } else { + Edge currentEdge = baseReader.getCurrentEdge(); + reverseSourceId = currentEdge.getTargetVertexId(); + reverseEdge = EdgeFactory.create(baseReader.getCurrentSourceId(), + currentEdge.getValue()); + haveReverseEdge = true; + } + return result; + } + + @Override + public I getCurrentSourceId() throws IOException, InterruptedException { + if (haveReverseEdge) { + return reverseSourceId; + } else { + return baseReader.getCurrentSourceId(); + } + } + + @Override + public Edge getCurrentEdge() throws IOException, InterruptedException { + if (haveReverseEdge) { + return reverseEdge; + } else { + return baseReader.getCurrentEdge(); + } + } + + @Override + public void close() throws IOException { + baseReader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return baseReader.getProgress(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java index 1b1c896..3487cee 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java @@ -18,7 +18,7 @@ package org.apache.giraph.io; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java index 923ca5c..3ccb0fd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java @@ -19,7 +19,7 @@ package org.apache.giraph.io; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java index 82a19bb..38c5548 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java @@ -20,7 +20,7 @@ package org.apache.giraph.io; import java.io.IOException; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.TaskAttemptContext; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java index 5092352..8fe0db6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java @@ -17,7 +17,8 @@ */ package org.apache.giraph.io.formats; -import org.apache.giraph.graph.Edge; +import com.google.common.collect.Lists; +import org.apache.giraph.edge.Edge; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -25,8 +26,6 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.google.common.collect.Lists; - import java.io.IOException; import java.util.List; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java index 934663e..5815403 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java @@ -17,8 +17,8 @@ */ package org.apache.giraph.io.formats; -import org.apache.giraph.graph.Edge; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java index 352f054..fe4a1d5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java @@ -19,7 +19,7 @@ package org.apache.giraph.io.formats; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java index b00e495..28539f5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java @@ -18,16 +18,15 @@ package org.apache.giraph.io.formats; -import org.apache.giraph.graph.Edge; -import org.apache.giraph.graph.EdgeFactory; +import com.google.common.collect.Lists; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.google.common.collect.Lists; - import java.io.IOException; import java.util.List; import java.util.regex.Pattern; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java index dda3f2f..4950d21 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java @@ -17,15 +17,14 @@ */ package org.apache.giraph.io.formats; -import org.apache.giraph.graph.Edge; +import com.google.common.collect.ImmutableList; +import org.apache.giraph.edge.Edge; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.google.common.collect.ImmutableList; - import java.io.IOException; /** http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java index 1e3b643..0270348 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java @@ -18,8 +18,8 @@ package org.apache.giraph.io.formats; -import org.apache.giraph.graph.ReverseEdgeDuplicator; import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.io.ReverseEdgeDuplicator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java index 21ca427..6eaf7dc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java @@ -18,8 +18,10 @@ package org.apache.giraph.io.formats; -import org.apache.giraph.graph.Edge; -import org.apache.giraph.graph.EdgeFactory; +import com.google.common.collect.Lists; +import net.iharder.Base64; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -29,9 +31,6 @@ import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; -import com.google.common.collect.Lists; -import net.iharder.Base64; - import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java index 0599742..7d8fcf6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java @@ -18,8 +18,9 @@ package org.apache.giraph.io.formats; -import org.apache.giraph.graph.Edge; -import org.apache.giraph.vertex.Vertex; +import net.iharder.Base64; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -28,8 +29,6 @@ import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; -import net.iharder.Base64; - import java.io.ByteArrayOutputStream; import java.io.DataOutput; import java.io.DataOutputStream; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java index 2df20f1..2ac2dad 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java @@ -17,9 +17,10 @@ */ package org.apache.giraph.io.formats; -import org.apache.giraph.graph.Edge; -import org.apache.giraph.graph.EdgeFactory; -import org.apache.giraph.vertex.Vertex; +import com.google.common.collect.Lists; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; @@ -29,8 +30,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.json.JSONArray; import org.json.JSONException; -import com.google.common.collect.Lists; - import java.io.IOException; import java.util.List; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java index 9a751ae..d0a3305 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java @@ -18,8 +18,8 @@ package org.apache.giraph.io.formats; -import org.apache.giraph.graph.Edge; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java index 4e35201..09fb991 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java @@ -17,8 +17,8 @@ */ package org.apache.giraph.io.formats; -import org.apache.giraph.graph.Edge; -import org.apache.giraph.graph.EdgeFactory; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java index 2024863..cd454e3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java @@ -18,12 +18,13 @@ package org.apache.giraph.io.formats; +import com.google.common.collect.Sets; import org.apache.giraph.bsp.BspInputSplit; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.graph.Edge; -import org.apache.giraph.graph.EdgeFactory; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; @@ -31,8 +32,6 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; -import com.google.common.collect.Sets; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -91,7 +90,7 @@ public class PseudoRandomEdgeInputFormat /** Aggregate vertices (all input splits). */ private long aggregateVertices = -1; /** Edges per vertex. */ - private long edgesPerVertex = -1; + private int edgesPerVertex = -1; /** BspInputSplit (used only for index). */ private BspInputSplit bspInputSplit; /** Saved configuration */ @@ -129,7 +128,7 @@ public class PseudoRandomEdgeInputFormat "initialize: Got " + inputSplit.getClass() + " instead of " + BspInputSplit.class); } - edgesPerVertex = configuration.getLong( + edgesPerVertex = configuration.getInt( PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0); if (edgesPerVertex <= 0) { throw new IllegalArgumentException( @@ -184,8 +183,8 @@ public class PseudoRandomEdgeInputFormat "" + destVertexId + ")"); } return EdgeFactory.create( - destVertexId, - new DoubleWritable(random.nextDouble())); + destVertexId, + new DoubleWritable(random.nextDouble())); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java index 4da8f9d..40a20e1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java @@ -18,15 +18,14 @@ package org.apache.giraph.io.formats; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.giraph.bsp.BspInputSplit; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.graph.Edge; -import org.apache.giraph.graph.EdgeFactory; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexReader; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.edge.VertexEdges; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; @@ -88,7 +87,7 @@ public class PseudoRandomVertexInputFormat extends /** Aggregate vertices (all input splits). */ private long aggregateVertices = -1; /** Edges per vertex. */ - private long edgesPerVertex = -1; + private int edgesPerVertex = -1; /** BspInputSplit (used only for index). */ private BspInputSplit bspInputSplit; /** Saved configuration */ @@ -132,7 +131,7 @@ public class PseudoRandomVertexInputFormat extends "initialize: Got " + inputSplit.getClass() + " instead of " + BspInputSplit.class); } - edgesPerVertex = configuration.getLong( + edgesPerVertex = configuration.getInt( PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0); if (edgesPerVertex <= 0) { throw new IllegalArgumentException( @@ -161,8 +160,10 @@ public class PseudoRandomVertexInputFormat extends // same. Random rand = new Random(vertexId); DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble()); - List> edges = - Lists.newArrayListWithCapacity((int) edgesPerVertex); + // In order to save memory and avoid copying, we add directly to a + // VertexEdges instance. + VertexEdges edges = + configuration.createAndInitializeVertexEdges(edgesPerVertex); Set destVertices = Sets.newHashSet(); for (long i = 0; i < edgesPerVertex; ++i) { LongWritable destVertexId = new LongWritable(); http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java index 6a5813b..1071196 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java @@ -17,7 +17,7 @@ */ package org.apache.giraph.io.formats; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexReader; import org.apache.hadoop.io.Writable; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java index 0538db9..468e6bd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java @@ -19,7 +19,7 @@ package org.apache.giraph.io.formats; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.VertexWriter; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java index 36d00db..f7da40f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java @@ -17,8 +17,8 @@ */ package org.apache.giraph.io.formats; -import org.apache.giraph.graph.Edge; -import org.apache.giraph.graph.EdgeFactory; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java index c9f5df1..0aae894 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java @@ -19,10 +19,10 @@ package org.apache.giraph.io.formats; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.graph.Edge; -import org.apache.giraph.graph.EdgeFactory; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java index e359f66..898e57f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java @@ -19,10 +19,10 @@ package org.apache.giraph.io.formats; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.graph.Edge; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexReader; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java index 9f1fe1f..ad96cfe 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java @@ -18,7 +18,7 @@ package org.apache.giraph.io.formats; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.VertexWriter; import org.apache.hadoop.io.Text;