Return-Path: X-Original-To: apmail-incubator-giraph-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-giraph-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D2C267BFB for ; Thu, 17 Nov 2011 09:51:07 +0000 (UTC) Received: (qmail 8973 invoked by uid 500); 17 Nov 2011 09:51:07 -0000 Delivered-To: apmail-incubator-giraph-commits-archive@incubator.apache.org Received: (qmail 8938 invoked by uid 500); 17 Nov 2011 09:51:06 -0000 Mailing-List: contact giraph-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: giraph-dev@incubator.apache.org Delivered-To: mailing list giraph-commits@incubator.apache.org Received: (qmail 8923 invoked by uid 99); 17 Nov 2011 09:51:05 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Nov 2011 09:51:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Nov 2011 09:51:02 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 95C6923889B3; Thu, 17 Nov 2011 09:50:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1203130 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/ src/test... Date: Thu, 17 Nov 2011 09:50:42 -0000 To: giraph-commits@incubator.apache.org From: aching@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111117095042.95C6923889B3@eris.apache.org> Author: aching Date: Thu Nov 17 09:50:41 2011 New Revision: 1203130 URL: http://svn.apache.org/viewvc?rev=1203130&view=rev Log: GIRAPH-91: Large-memory improvements (Memory reduced vertex implementation, fast failure, added settings). (aching) Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/ incubator/giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java Modified: incubator/giraph/trunk/CHANGELOG incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java Modified: incubator/giraph/trunk/CHANGELOG URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1203130&r1=1203129&r2=1203130&view=diff ============================================================================== --- incubator/giraph/trunk/CHANGELOG (original) +++ incubator/giraph/trunk/CHANGELOG Thu Nov 17 09:50:41 2011 @@ -2,6 +2,9 @@ Giraph Change Log Release 0.70.0 - unreleased + GIRAPH-91: Large-memory improvements (Memory reduced vertex + implementation, fast failure, added settings). (aching) + GIRAPH-89: Remove debugging system.out from LongDoubleFloatDoubleVertex. (shaunak via aching) Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java?rev=1203130&r1=1203129&r2=1203130&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java Thu Nov 17 09:50:41 2011 @@ -23,6 +23,7 @@ import org.apache.commons.cli.CommandLin import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.giraph.graph.EdgeListVertex; import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.Vertex; import org.apache.hadoop.conf.Configuration; @@ -36,33 +37,60 @@ import java.util.Iterator; /** * Benchmark based on the basic Pregel PageRank implementation. */ -public class PageRankBenchmark extends - Vertex - implements Tool { +public class PageRankBenchmark implements Tool { /** Configuration from Configurable */ private Configuration conf; /** How many supersteps to run */ public static String SUPERSTEP_COUNT = "PageRankBenchmark.superstepCount"; - @Override - public void compute(Iterator msgIterator) { - if (getSuperstep() >= 1) { - double sum = 0; - while (msgIterator.hasNext()) { - sum += msgIterator.next().get(); + public static class PageRankVertex extends Vertex< + LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> { + @Override + public void compute(Iterator msgIterator) { + if (getSuperstep() >= 1) { + double sum = 0; + while (msgIterator.hasNext()) { + sum += msgIterator.next().get(); + } + DoubleWritable vertexValue = + new DoubleWritable((0.15f / getNumVertices()) + 0.85f * + sum); + setVertexValue(vertexValue); + } + + if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) { + long edges = getNumOutEdges(); + sendMsgToAllEdges( + new DoubleWritable(getVertexValue().get() / edges)); + } else { + voteToHalt(); } - DoubleWritable vertexValue = - new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum); - setVertexValue(vertexValue); } + } - if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) { - long edges = getNumOutEdges(); - sendMsgToAllEdges( - new DoubleWritable(getVertexValue().get() / edges)); - } else { - voteToHalt(); + public static class PageRankEdgeListVertex extends EdgeListVertex< + LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> { + @Override + public void compute(Iterator msgIterator) { + if (getSuperstep() >= 1) { + double sum = 0; + while (msgIterator.hasNext()) { + sum += msgIterator.next().get(); + } + DoubleWritable vertexValue = + new DoubleWritable((0.15f / getNumVertices()) + 0.85f * + sum); + setVertexValue(vertexValue); + } + + if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) { + long edges = getNumOutEdges(); + sendMsgToAllEdges( + new DoubleWritable(getVertexValue().get() / edges)); + } else { + voteToHalt(); + } } } @@ -97,6 +125,10 @@ public class PageRankBenchmark extends "edgesPerVertex", true, "Edges per vertex"); + options.addOption("c", + "vertexClass", + true, + "Vertex class (0 for Vertex, 1 for EdgeListVertex)"); HelpFormatter formatter = new HelpFormatter(); if (args.length == 0) { formatter.printHelp(getClass().getName(), options, true); @@ -125,9 +157,19 @@ public class PageRankBenchmark extends "per vertex (-e)"); return -1; } + int workers = Integer.parseInt(cmd.getOptionValue('w')); GiraphJob job = new GiraphJob(getConf(), getClass().getName()); - job.setVertexClass(getClass()); + if (!cmd.hasOption('c') || + (Integer.parseInt(cmd.getOptionValue('c')) == 0)) { + System.out.println("Using " + + PageRankVertex.class.getName()); + job.setVertexClass(PageRankVertex.class); + } else { + System.out.println("Using " + + PageRankEdgeListVertex.class.getName()); + job.setVertexClass(PageRankEdgeListVertex.class); + } job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class); job.setWorkerConfiguration(workers, workers, 100.0f); job.getConfiguration().setLong( Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1203130&r1=1203129&r2=1203130&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Thu Nov 17 09:50:41 2011 @@ -153,4 +153,9 @@ public interface CentralizedServiceWorke * @return BspMapper */ GraphMapper getGraphMapper(); + + /** + * Operations that will be called if there is a failure by a worker. + */ + void failureCleanup(); } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1203130&r1=1203129&r2=1203130&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Thu Nov 17 09:50:41 2011 @@ -851,6 +851,7 @@ end[HADOOP_FACEBOOK]*/ for (Future future : futures) { try { future.get(); + context.progress(); } catch (Exception e) { throw new RuntimeException(e); } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1203130&r1=1203129&r2=1203130&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Thu Nov 17 09:50:41 2011 @@ -136,8 +136,9 @@ public class BspServiceWorker< commService = new RPCCommunications( context, this, graphState); graphState.setWorkerCommunications(commService); - this.workerContext = BspUtils.createWorkerContext(getConfiguration(), - graphMapper.getGraphState()); + this.workerContext = + BspUtils.createWorkerContext(getConfiguration(), + graphMapper.getGraphState()); } public WorkerContext getWorkerContext() { @@ -710,6 +711,30 @@ public class BspServiceWorker< } } + /** + * Do this to help notify the master quicker that this worker has failed. + */ + private void unregisterHealth() { + LOG.error("unregisterHealth: Got failure, unregistering health on " + + myHealthZnode + " on superstep " + getSuperstep()); + try { + getZkExt().delete(myHealthZnode, -1); + } catch (InterruptedException e) { + throw new IllegalStateException( + "unregisterHealth: InterruptedException - Couldn't delete " + + myHealthZnode, e); + } catch (KeeperException e) { + throw new IllegalStateException( + "unregisterHealth: KeeperException - Couldn't delete " + + myHealthZnode, e); + } + } + + @Override + public void failureCleanup() { + unregisterHealth(); + } + @Override public Collection startSuperstep() { // Algorithm: Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1203130&view=auto ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (added) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Thu Nov 17 09:50:41 2011 @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import com.google.common.collect.Lists; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * User applications can subclass {@link EdgeListVertex}, which stores + * the outbound edges in an ArrayList (less memory as the cost of expensive + * sorting and random-access lookup). Good for static graphs. + * + * @param Vertex index value + * @param Vertex value + * @param Edge value + * @param Message value + */ +@SuppressWarnings("rawtypes") +public abstract class EdgeListVertex + extends MutableVertex { + /** Class logger */ + private static final Logger LOG = Logger.getLogger(EdgeListVertex.class); + /** Vertex id */ + private I vertexId = null; + /** Vertex value */ + private V vertexValue = null; + /** List of the dest edge indices */ + private List destEdgeIndexList; + /** List of the dest edge values */ + /** Map of destination vertices and their edge values */ + private List destEdgeValueList; + /** List of incoming messages from the previous superstep */ + private final List msgList = new ArrayList(); + + @Override + public void initialize(I vertexId, V vertexValue, + Map edges, + List messages) { + if (vertexId != null) { + setVertexId(vertexId); + } + if (vertexValue != null) { + setVertexValue(vertexValue); + } + if (edges != null && !edges.isEmpty()) { + destEdgeIndexList = Lists.newArrayListWithCapacity(edges.size()); + destEdgeValueList = Lists.newArrayListWithCapacity(edges.size()); + List sortedIndexList = new ArrayList(edges.keySet()); + Collections.sort(sortedIndexList, new VertexIdComparator()); + for (I index : sortedIndexList) { + destEdgeIndexList.add(index); + destEdgeValueList.add(edges.get(index)); + } + sortedIndexList.clear(); + } else { + destEdgeIndexList = Lists.newArrayList(); + destEdgeValueList = Lists.newArrayList(); + } + if (messages != null && !messages.isEmpty()) { + msgList.addAll(messages); + } + } + + @Override + public boolean equals(Object other) { + if (other instanceof EdgeListVertex) { + @SuppressWarnings("unchecked") + EdgeListVertex otherVertex = (EdgeListVertex) other; + if (!getVertexId().equals(otherVertex.getVertexId())) { + return false; + } + if (!getVertexValue().equals(otherVertex.getVertexValue())) { + return false; + } + if (!getMsgList().equals(otherVertex.getMsgList())) { + return false; + } + Iterator iterator = iterator(); + Iterator otherIterator = otherVertex.iterator(); + while (iterator.hasNext() && otherIterator.hasNext()) { + I index = iterator.next(); + I otherIndex = otherIterator.next(); + if (!(index == null ? otherIndex == null : + index.equals(otherIndex))) { + return false; + } + } + return !(iterator.hasNext() || otherIterator.hasNext()); + } + return false; + } + + /** + * Comparator for the vertex id + */ + private class VertexIdComparator implements Comparator { + @SuppressWarnings("unchecked") + @Override + public int compare(I index1, I index2) { + return index1.compareTo(index2); + } + } + + @Override + public final boolean addEdge(I targetVertexId, E edgeValue) { + int pos = Collections.binarySearch(destEdgeIndexList, + targetVertexId, + new VertexIdComparator()); + if (pos == destEdgeIndexList.size() || + !destEdgeIndexList.get(pos).equals(targetVertexId)) { + destEdgeIndexList.add(pos, targetVertexId); + destEdgeValueList.add(pos, edgeValue); + return true; + } else { + LOG.warn("addEdge: Vertex=" + vertexId + + ": already added an edge value for dest vertex id " + + targetVertexId); + return false; + } + } + + @Override + public long getSuperstep() { + return getGraphState().getSuperstep(); + } + + @Override + public final void setVertexId(I vertexId) { + this.vertexId = vertexId; + } + + @Override + public final I getVertexId() { + return vertexId; + } + + @Override + public final V getVertexValue() { + return vertexValue; + } + + @Override + public final void setVertexValue(V vertexValue) { + this.vertexValue = vertexValue; + } + + @Override + public E getEdgeValue(I targetVertexId) { + int pos = Collections.binarySearch(destEdgeIndexList, + targetVertexId, + new VertexIdComparator()); + if (pos == destEdgeIndexList.size() || + !destEdgeIndexList.get(pos).equals(targetVertexId)) { + return null; + } else { + return destEdgeValueList.get(pos); + } + } + + @Override + public boolean hasEdge(I targetVertexId) { + int pos = Collections.binarySearch(destEdgeIndexList, + targetVertexId, + new VertexIdComparator()); + if (pos == destEdgeIndexList.size() || + !destEdgeIndexList.get(pos).equals(targetVertexId)) { + return false; + } else { + return true; + } + } + + /** + * Get an iterator to the edges on this vertex. + * + * @return A sorted iterator, as defined by the sort-order + * of the vertex ids + */ + @Override + public Iterator iterator() { + return destEdgeIndexList.iterator(); + } + + @Override + public int getNumOutEdges() { + return destEdgeIndexList.size(); + } + + @Override + public E removeEdge(I targetVertexId) { + int pos = Collections.binarySearch(destEdgeIndexList, + targetVertexId, + new VertexIdComparator()); + if (pos == destEdgeIndexList.size() || + !destEdgeIndexList.get(pos).equals(targetVertexId)) { + return null; + } else { + destEdgeIndexList.remove(pos); + return destEdgeValueList.remove(pos); + } + } + + @Override + public final void sendMsgToAllEdges(M msg) { + if (msg == null) { + throw new IllegalArgumentException( + "sendMsgToAllEdges: Cannot send null message to all edges"); + } + for (I index : destEdgeIndexList) { + sendMsg(index, msg); + } + } + + @Override + public void addVertexRequest(MutableVertex vertex) + throws IOException { + getGraphState().getWorkerCommunications(). + addVertexReq(vertex); + } + + @Override + public void removeVertexRequest(I vertexId) throws IOException { + getGraphState().getWorkerCommunications(). + removeVertexReq(vertexId); + } + + @Override + public void addEdgeRequest(I vertexIndex, + Edge edge) throws IOException { + getGraphState().getWorkerCommunications(). + addEdgeReq(vertexIndex, edge); + } + + @Override + public void removeEdgeRequest(I sourceVertexId, + I destVertexId) throws IOException { + getGraphState().getWorkerCommunications(). + removeEdgeReq(sourceVertexId, destVertexId); + } + + @Override + final public void readFields(DataInput in) throws IOException { + vertexId = BspUtils.createVertexIndex(getConf()); + vertexId.readFields(in); + boolean hasVertexValue = in.readBoolean(); + if (hasVertexValue) { + vertexValue = BspUtils.createVertexValue(getConf()); + vertexValue.readFields(in); + } + int edgeListCount = in.readInt(); + destEdgeIndexList = Lists.newArrayListWithCapacity(edgeListCount); + destEdgeValueList = Lists.newArrayListWithCapacity(edgeListCount); + for (int i = 0; i < edgeListCount; ++i) { + I vertexId = BspUtils.createVertexIndex(getConf()); + E edgeValue = BspUtils.createEdgeValue(getConf()); + vertexId.readFields(in); + edgeValue.readFields(in); + destEdgeIndexList.add(vertexId); + destEdgeValueList.add(edgeValue); + } + int msgListSize = in.readInt(); + for (int i = 0; i < msgListSize; ++i) { + M msg = BspUtils.createMessageValue(getConf()); + msg.readFields(in); + msgList.add(msg); + } + halt = in.readBoolean(); + } + + @Override + final public void write(DataOutput out) throws IOException { + vertexId.write(out); + out.writeBoolean(vertexValue != null); + if (vertexValue != null) { + vertexValue.write(out); + } + out.writeInt(destEdgeIndexList.size()); + for (int i = 0 ; i < destEdgeIndexList.size(); ++i) { + destEdgeIndexList.get(i).write(out); + destEdgeValueList.get(i).write(out); + } + out.writeInt(msgList.size()); + for (M msg : msgList) { + msg.write(out); + } + out.writeBoolean(halt); + } + + @Override + public List getMsgList() { + return msgList; + } + + @Override + public String toString() { + return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + + ",#edges=" + getNumOutEdges() + ")"; + } +} + Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1203130&r1=1203129&r2=1203130&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Thu Nov 17 09:50:41 2011 @@ -22,6 +22,7 @@ import org.apache.giraph.bsp.BspInputFor import org.apache.giraph.bsp.BspOutputFormat; import org.apache.giraph.graph.partition.GraphPartitionerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Client; import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.Logger; @@ -261,7 +262,7 @@ public class GiraphJob extends Job { public static final Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false; /** Default ZooKeeper tick time. */ - public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 2000; + public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 6000; /** Default ZooKeeper init limit (in ticks). */ public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10; /** Default ZooKeeper sync limit (in ticks). */ @@ -270,10 +271,10 @@ public class GiraphJob extends Job { public static final int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000; /** Default ZooKeeper maximum client connections. */ public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000; - /** Default ZooKeeper minimum session timeout (in msecs). */ - public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 10000; - /** Default ZooKeeper maximum session timeout (in msecs). */ - public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 100000; + /** Default ZooKeeper minimum session timeout of 5 minutes (in msecs). */ + public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 300*1000; + /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */ + public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600*1000; /** Class logger */ private static final Logger LOG = Logger.getLogger(GiraphJob.class); @@ -511,6 +512,10 @@ public class GiraphJob extends Job { // Speculative execution doesn't make sense for Giraph conf.setBoolean("mapred.map.tasks.speculative.execution", false); + // Set the ping interval to 5 minutes instead of one minute + // (DEFAULT_PING_INTERVAL) + Client.setPingInterval(conf, 60000*5); + if (getJar() == null) { setJarByClass(GiraphJob.class); } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1203130&r1=1203129&r2=1203130&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu Nov 17 09:50:41 2011 @@ -627,4 +627,25 @@ public class GraphMapper { + + @Override + public void compute(Iterator msgIterator) + throws IOException { + } + } + + @Override + public void setUp() { + try { + job = new GiraphJob("TestEdgeArrayVertex"); + } catch (IOException e) { + throw new RuntimeException("setUp: Failed", e); + } + job.setVertexClass(IFDLEdgeListVertex.class); + job.getConfiguration().setClass(GiraphJob.VERTEX_INDEX_CLASS, + IntWritable.class, WritableComparable.class); + job.getConfiguration().setClass(GiraphJob.VERTEX_VALUE_CLASS, + FloatWritable.class, Writable.class); + job.getConfiguration().setClass(GiraphJob.EDGE_VALUE_CLASS, + DoubleWritable.class, Writable.class); + job.getConfiguration().setClass(GiraphJob.MESSAGE_VALUE_CLASS, + LongWritable.class, Writable.class); + vertex = (IFDLEdgeListVertex) + BspUtils. + createVertex(job.getConfiguration()); + } + + public void testInstantiate() throws IOException { + assertNotNull(vertex); + } + + public void testEdges() { + Map edgeMap = Maps.newHashMap(); + for (int i = 1000; i > 0; --i) { + edgeMap.put(new IntWritable(i), new DoubleWritable(i*2.0)); + } + vertex.initialize(null, null, edgeMap, null); + assertEquals(vertex.getNumOutEdges(), 1000); + int expectedIndex = 1; + for (IntWritable index : vertex) { + assertEquals(index.get(), expectedIndex); + assertEquals(vertex.getEdgeValue(index).get(), + expectedIndex * 2.0d); + ++expectedIndex; + } + assertEquals(vertex.removeEdge(new IntWritable(500)), + new DoubleWritable(1000)); + assertEquals(vertex.getNumOutEdges(), 999); + } + + public void testSerialize() { + Map edgeMap = Maps.newHashMap(); + for (int i = 1000; i > 0; --i) { + edgeMap.put(new IntWritable(i), new DoubleWritable(i*2.0)); + } + List messageList = Lists.newArrayList(); + messageList.add(new LongWritable(4)); + messageList.add(new LongWritable(5)); + vertex.initialize( + new IntWritable(2), new FloatWritable(3.0f), edgeMap, messageList); + byte[] byteArray = WritableUtils.writeToByteArray(vertex); + IFDLEdgeListVertex readVertex = (IFDLEdgeListVertex) + BspUtils. + createVertex(job.getConfiguration()); + WritableUtils.readFieldsFromByteArray(byteArray, readVertex); + assertEquals(vertex, readVertex); + } +}