Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8BD0D179B7 for ; Wed, 8 Oct 2014 20:04:02 +0000 (UTC) Received: (qmail 97522 invoked by uid 500); 8 Oct 2014 20:04:02 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 97490 invoked by uid 500); 8 Oct 2014 20:04:02 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 97479 invoked by uid 99); 8 Oct 2014 20:04:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Oct 2014 20:04:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Oct 2014 20:03:37 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7187B2388A3D; Wed, 8 Oct 2014 20:03:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1630222 - in /hama/trunk: CHANGES.txt graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java graph/src/main/java/org/apache/hama/graph/VertexMessages.java Date: Wed, 08 Oct 2014 20:03:35 -0000 To: commits@hama.apache.org From: andronat@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20141008200335.7187B2388A3D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: andronat Date: Wed Oct 8 20:03:34 2014 New Revision: 1630222 URL: http://svn.apache.org/r1630222 Log: HAMA-921 Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java Removed: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java Modified: hama/trunk/CHANGES.txt hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Modified: hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1630222&r1=1630221&r2=1630222&view=diff ============================================================================== --- hama/trunk/CHANGES.txt (original) +++ hama/trunk/CHANGES.txt Wed Oct 8 20:03:34 2014 @@ -17,6 +17,7 @@ Release 0.7.0 (unreleased changes) HAMA-885: Semi-Clustering is not producing expected output (Renil J via edwardyoon) IMPROVEMENTS + HAMA-921: Polish doSuperstep() function and VertexMessageIterable class (Anastasis Andronidis) HAMA-913: Add RPC implementation using netty(bsmin) HAMA-914: Boolean flag (isCompressed) is required only when runtime compression is enabled (edwardyoon) HAMA-910: Web UI Improvement (Victor Lee via edwardyoon) Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1630222&r1=1630221&r2=1630222&view=diff ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Oct 8 20:03:34 2014 @@ -20,7 +20,6 @@ package org.apache.hama.graph; import java.io.IOException; import java.util.Collections; import java.util.Map.Entry; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -154,6 +153,8 @@ public final class GraphJobRunner pair as a result. Note that * this will also be executed when failure happened. + * @param peer + * @throws java.io.IOException */ @Override public final void cleanup( @@ -203,7 +204,7 @@ public final class GraphJobRunner peer) throws IOException { int activeVertices = 0; this.changedVertexCnt = 0; - vertices.startSuperstep(); + this.vertices.startSuperstep(); - /* - * We iterate over our messages and vertices in sorted order. That means - * that we need to seek the first vertex that has the same ID as the - * currentMessage or the first vertex that is active. - */ - IDSkippingIterator iterator = vertices.skippingIterator(); - VertexMessageIterable iterable = null; - Vertex vertex = null; + IDSkippingIterator iterator = this.vertices.skippingIterator(); + VertexMessages queueMessages = new VertexMessages(peer); + queueMessages.prependMessage(currentMessage); // note that can't skip inactive vertices because we have to rewrite the // complete vertex file in each iteration - while (iterator.hasNext( - currentMessage == null ? null : (V) currentMessage.getVertexId(), - Strategy.ALL)) { - - vertex = iterator.next(); - if (currentMessage != null) { - iterable = iterate(currentMessage, (V) currentMessage.getVertexId(), - vertex, peer); - } else { - iterable = null; - } + V firstVID = currentMessage == null ? null : (V) currentMessage.getVertexId(); + while (iterator.hasNext(firstVID, Strategy.ALL)) { + Vertex vertex = iterator.next(); + boolean msgsExist = queueMessages.continueWith(vertex.getVertexID()); - if (iterable != null && vertex.isHalted()) { + if (!msgsExist) checkMsgOrder(vertex.getVertexID(), queueMessages); + + if (msgsExist && vertex.isHalted()) { vertex.setActive(); } if (!vertex.isHalted()) { - if (iterable == null) { - vertex.compute(Collections. emptyList()); - } else { - vertex.compute(iterable); - currentMessage = iterable.getOverflowMessage(); - } + vertex.compute(queueMessages); activeVertices++; } + // Dump remaining messages + queueMessages.dumpRest(); + // note that we even need to rewrite the vertex if it is halted for // consistency reasons - vertices.finishVertexComputation(vertex); + this.vertices.finishVertexComputation(vertex); } - vertices.finishSuperstep(); + this.vertices.finishSuperstep(); getAggregationRunner().sendAggregatorValues(peer, activeVertices, this.changedVertexCnt); - iteration++; + this.iteration++; } /** - * Iterating utility that ensures following things:
- * - if vertex is active, but the given message does not match the vertexID, - * return null.
- * - if vertex is inactive, but received a message that matches the ID, build - * an iterator that can be iterated until the next vertex has been reached - * (not buffer in memory) and set the vertex active
- * - if vertex is active, and the given message does match the vertexID, - * return an iterator that can be iterated until the next vertex has been - * reached.
- * - if vertex is inactive, and received no message, return null. + * Utility that ensures that the incoming messages have a target vertex. */ - @SuppressWarnings("unchecked") - private VertexMessageIterable iterate(GraphJobMessage currentMessage, - V firstMessageId, Vertex vertex, - BSPPeer peer) { - int comparision = firstMessageId.compareTo(vertex.getVertexID()); - if (conf.getBoolean("hama.check.missing.vertex", true)) { - if (comparision < 0) { + private void checkMsgOrder(V vid, VertexMessages vm) { + // When the vid is greater than the current message, it means that a vertex + // has sent a message to an other vertex that doesn't exist + if (vm.getMessageVID() != null && vm.getMessageVID().compareTo(vid) < 0) { + if (conf.getBoolean("hama.check.missing.vertex", true)) { throw new IllegalArgumentException( - "A message has recieved with a destination ID: " + firstMessageId + - " that does not exist! (Vertex iterator is at" + vertex.getVertexID() - + " ID)"); - } - } else { - while (comparision < 0) { - VertexMessageIterable messageIterable = new VertexMessageIterable( - currentMessage, firstMessageId, peer); - currentMessage = messageIterable.getOverflowMessage(); - firstMessageId = (V) currentMessage.getVertexId(); - comparision = firstMessageId.compareTo(vertex.getVertexID()); - } - } - if (comparision == 0) { - // vertex id matches with the vertex, return an iterator with newest - // message - return new VertexMessageIterable(currentMessage, - vertex.getVertexID(), peer); - } else { - // return null - return null; + "A message has recieved with a destination ID: " + vm.getMessageVID() + + " that does not exist! (Vertex iterator is at" + vid + " ID)"); + } else { + // Skip all unrecognized messages until we find a match + vm.continueUntil(vid); + } } } @@ -431,9 +396,9 @@ public final class GraphJobRunner 0) { - throw new IOException("The records of split aren't in order by vertex ID."); + throw new IOException("The records of split aren't in order by vertex ID."); } - + if (selfReference) { vertex.addEdge(new Edge(vertex.getVertexID(), null)); } Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java?rev=1630222&view=auto ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java (added) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessages.java Wed Oct 8 20:03:34 2014 @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hama.graph; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.hama.bsp.BSPPeer; + +/** + * This class has as a target to iterate the whole sorted queue of the incoming + * messages. Each vertex will be able to call the hasNext() and + * next() methods to consume the messages. The iterator is + * responsible to understand when the messages of a specific Vertex ID have been + * consumed, and then unlock the messages of the next Vertex ID through the + * continueWith() method. + * + * @param + * @param + */ +public class VertexMessages implements Iterator, Iterable { + private final BSPPeer peer; + private V vid = null; + private GraphJobMessage currentMessage = null; + private boolean locked = true; + + public VertexMessages(BSPPeer peer) { + this.peer = peer; + } + + @Override + public boolean hasNext() { + if (locked) { + return false; + } + + try { + if (this.currentMessage == null) { + this.currentMessage = this.peer.getCurrentMessage(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (this.currentMessage != null && this.currentMessage.getVertexId().equals(this.vid)) { + return true; + } + // When a new ID has shown up or the messages are finished, + // we lock the iterator + this.locked = true; + return false; + } + + @Override + public T next() { + if (this.currentMessage == null || this.locked) { + return null; + } + + // Despose the current message and prepare for next hasNext() call + try { + return (T) this.currentMessage.getVertexValue(); + } finally { + this.currentMessage = null; + } + } + + /** + * By implementing both Iterator and Iterable + * interfaces, this class will not be able to re-iterated and the messages + * will be accessed only once. In our case this is fine. + * + * @return an one-time iterator + */ + @Override + public Iterator iterator() { + return this; + } + + /** + * This method should be used only after initialization. If an other message + * exists in the memory of the iterator, the new prepended message + * will be ignored. + * + * @param msg The message to be prepended just after initialization + */ + public void prependMessage(GraphJobMessage msg) { + if (this.currentMessage == null && msg != null) { + this.currentMessage = msg; + } + } + + /** + * Check the vertexID target of the current message that is loaded in the + * iterator and unlock the iterator only if the vid argument is + * matching. + * + * @param vid + * @return return true if the vid is equal to the next message's ID + */ + public boolean continueWith(V vid) { + // Normally when we call this method this.locked == true + this.vid = vid; + this.locked = false; + + if (this.currentMessage == null) { + // Get next message (if there is) and decide based on the new vid + return this.hasNext(); + } + + // If we have a message already loaded + if (!this.currentMessage.getVertexId().equals(vid)) { + this.locked = true; + return false; + } + return true; + } + + /** + * Consume the incoming messages until we find a message that has a target + * equal to the vid argument. + * + * @param vid + * @return + */ + public boolean continueUntil(V vid) { + do { + try { + if (this.currentMessage == null) { + this.currentMessage = this.peer.getCurrentMessage(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (this.currentMessage == null) { + this.locked = true; + return false; + } + } while (!this.currentMessage.getVertexId().equals(this.vid)); + + this.locked = false; + return true; + } + + public void dumpRest() { + while(this.hasNext()) { + this.next(); + } + } + + /** + * Return the target Vertex ID of the current loaded message. + * + * @return + */ + public V getMessageVID() { + return this.currentMessage == null ? null : (V) this.currentMessage.getVertexId(); + } +}