Author: ereisman
Date: Mon Sep 10 21:32:14 2012
New Revision: 1383115
URL: http://svn.apache.org/viewvc?rev=1383115&view=rev
Log:
GIRAPH-191: Random Walks On Graphs (Gianmarco De Francisci Morales via ereisman)
Added:
giraph/trunk/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java
giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java
giraph/trunk/src/main/java/org/apache/giraph/utils/MathUtils.java
giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java
giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java
giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java
giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java
giraph/trunk/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/checkstyle.xml
giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1383115&r1=1383114&r2=1383115&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Sep 10 21:32:14 2012
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-191: Random walks on graphs (Gianmarco De Francisci Morales via ereisman)
+
GIRAPH-320: Provide a runtime configuration for choosing the
log level (aching via ereisman)
Modified: giraph/trunk/checkstyle.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/checkstyle.xml?rev=1383115&r1=1383114&r2=1383115&view=diff
==============================================================================
--- giraph/trunk/checkstyle.xml (original)
+++ giraph/trunk/checkstyle.xml Mon Sep 10 21:32:14 2012
@@ -227,7 +227,9 @@
<module name="MethodLength">
<property name="max" value="200"/>
</module>
- <module name="ParameterNumber"/>
+ <module name="ParameterNumber">
+ <property name="max" value="8"/>
+ </module>
<!-- Checks for whitespace (tree walker) -->
<!-- See http://checkstyle.sf.net/config_whitespace.html -->
Added: giraph/trunk/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.utils.MathUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * A combiner that sums double-valued messages
+ */
+public class DoubleSumCombiner extends
+ VertexCombiner<LongWritable, DoubleWritable> {
+ @Override
+ public Iterable<DoubleWritable> combine(LongWritable vertexIndex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ return Collections.singleton(new DoubleWritable(MathUtils.sum(messages)));
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.io.TextVertexInputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
+ * unweighted graphs with long ids. Each line consists of: vertex neighbor1
+ * neighbor2 ...
+ */
+public class LongDoubleFloatDoubleTextInputFormat
+ extends
+ TextVertexInputFormat<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> {
+
+ @Override
+ public VertexReader<LongWritable,
+ DoubleWritable, FloatWritable, DoubleWritable>
+ createVertexReader(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ return new LongDoubleFloatDoubleVertexReader(
+ textInputFormat.createRecordReader(split, context));
+ }
+
+ /**
+ * Vertex reader associated with {@link LongDoubleFloatDoubleTextInputFormat}.
+ */
+ public static class LongDoubleFloatDoubleVertexReader extends
+ TextVertexInputFormat.TextVertexReader<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> {
+ /** Separator of the vertex and neighbors */
+ private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+ /**
+ * Constructor with the line reader.
+ * @param lineReader Internal line reader.
+ */
+ public LongDoubleFloatDoubleVertexReader(
+ RecordReader<LongWritable, Text> lineReader) {
+ super(lineReader);
+ }
+
+ @Override
+ public Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+ getCurrentVertex() throws IOException, InterruptedException {
+ Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+ vertex = BspUtils.<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable>createVertex(getContext()
+ .getConfiguration());
+
+ String[] tokens =
+ SEPARATOR.split(getRecordReader().getCurrentValue().toString());
+ Map<LongWritable, FloatWritable> edges =
+ Maps.newHashMapWithExpectedSize(tokens.length - 1);
+ float weight = 1.0f / (tokens.length - 1);
+ for (int n = 1; n < tokens.length; n++) {
+ edges.put(new LongWritable(Long.parseLong(tokens[n])),
+ new FloatWritable(weight));
+ }
+
+ LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
+ vertex.initialize(vertexId, new DoubleWritable(), edges,
+ Lists.<DoubleWritable>newArrayList());
+
+ return vertex;
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.io.TextVertexInputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
+ * unweighted graphs with long ids. Each line consists of: vertex
+ * neighbor1:weight1 neighbor2:weight2 ...
+ */
+public class NormalizingLongDoubleFloatDoubleTextInputFormat
+ extends
+ TextVertexInputFormat<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> {
+
+ @Override
+ public VertexReader<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new NormalizingLongDoubleFloatDoubleVertexReader(
+ textInputFormat.createRecordReader(split, context));
+ }
+
+ /**
+ * Vertex reader associated with {@link LongDoubleFloatDoubleTextInputFormat}.
+ */
+ public static class NormalizingLongDoubleFloatDoubleVertexReader
+ extends
+ TextVertexInputFormat.TextVertexReader<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> {
+ /** Separator of the vertex and neighbors */
+ private static final Pattern EDGE_SEPARATOR = Pattern.compile("\\s+");
+ /** Separator of the edge id and edge weight */
+ private static final Pattern WEIGHT_SEPARATOR = Pattern.compile(":");
+
+ /**
+ * Constructor with the line reader.
+ * @param lineReader
+ * Internal line reader.
+ */
+ public NormalizingLongDoubleFloatDoubleVertexReader(
+ RecordReader<LongWritable, Text> lineReader) {
+ super(lineReader);
+ }
+
+ @Override
+ public Vertex<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> getCurrentVertex()
+ throws IOException, InterruptedException {
+ Vertex<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> vertex = BspUtils
+ .<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable>createVertex(getContext()
+ .getConfiguration());
+
+ String[] tokens = EDGE_SEPARATOR.split(getRecordReader()
+ .getCurrentValue().toString());
+ Map<LongWritable, FloatWritable> edges = Maps
+ .newHashMapWithExpectedSize(tokens.length - 1);
+ parse(tokens, edges);
+ normalize(edges);
+
+ LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
+ vertex.initialize(vertexId, new DoubleWritable(), edges,
+ Lists.<DoubleWritable>newArrayList());
+
+ return vertex;
+ }
+
+ /**
+ * Parse a set of tokens into a map ID -> weight.
+ * @param tokens The tokens to be parsed.
+ * @param edges The map that will contain the result of the parsing.
+ */
+ static void parse(String[] tokens, Map<LongWritable, FloatWritable> edges) {
+ for (int n = 1; n < tokens.length; n++) {
+ String[] parts = WEIGHT_SEPARATOR.split(tokens[n]);
+ edges.put(new LongWritable(Long.parseLong(parts[0])),
+ new FloatWritable(Float.parseFloat(parts[1])));
+ }
+ }
+
+ /**
+ * Normalize the edges with L1 normalization.
+ * @param edges The edges to be normalized.
+ */
+ static void normalize(Map<LongWritable, FloatWritable> edges) {
+ if (edges == null || edges.size() == 0) {
+ throw new IllegalArgumentException(
+ "Cannot normalize an empy set of edges");
+ }
+ float normalizer = 0.0f;
+ for (FloatWritable weight : edges.values()) {
+ normalizer += weight.get();
+ }
+ for (FloatWritable weight : edges.values()) {
+ weight.set(weight.get() / normalizer);
+ }
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.DoubleSumAggregator;
+import org.apache.giraph.graph.DefaultMasterCompute;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.LongDoubleFloatDoubleEdgeListVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+/**
+ * Base class for executing a random walk on the graph
+ */
+public abstract class RandomWalkVertex extends
+ LongDoubleFloatDoubleEdgeListVertex {
+ /** Configuration parameter for the number of supersteps to execute */
+ static final String MAX_SUPERSTEPS = RandomWalkVertex.class.getName() +
+ ".maxSupersteps";
+ /** Configuration parameter for the teleportation probability */
+ static final String TELEPORTATION_PROBABILITY = RandomWalkVertex.class
+ .getName() + ".teleportationProbability";
+ /** Name of aggregator for dangling nodes */
+ static final String DANGLING = "dangling";
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(RandomWalkVertex.class);
+ /** State probability of the vertex */
+ protected final DoubleWritable d = new DoubleWritable();
+
+ /**
+ * Compute an initial probability distribution for the vertex.
+ * @return The initial probability value.
+ */
+ protected abstract double initialProbability();
+
+ /**
+ * Perform a single step of a random walk computation.
+ * @param messages Messages received in the previous step.
+ * @param teleportationProbability Probability of teleporting to another
+ * vertex.
+ * @return The new probability distribution value.
+ */
+ protected abstract double recompute(Iterable<DoubleWritable> messages,
+ double teleportationProbability);
+
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws IOException {
+ double stateProbability;
+
+ if (getSuperstep() > 0) {
+ stateProbability = recompute(messages, teleportationProbability());
+ } else {
+ stateProbability = initialProbability();
+ }
+ d.set(stateProbability);
+ setValue(d);
+
+ // Compute dangling node contribution for next superstep
+ if (getNumEdges() == 0) {
+ aggregate(DANGLING, d);
+ }
+
+ // Execute the algorithm as often as configured,
+ // alternatively convergence could be checked via an Aggregator
+ if (getSuperstep() < maxSupersteps()) {
+ for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
+ double transitionProbability = stateProbability * edge.getValue().get();
+ sendMessage(edge.getTargetVertexId(), new DoubleWritable(
+ transitionProbability));
+ }
+ } else {
+ voteToHalt();
+ }
+ }
+
+ /**
+ * Reads the number of supersteps to execute from the configuration
+ * @return number of supersteps to execute
+ */
+ private int maxSupersteps() {
+ return ((RandomWalkWorkerContext) getWorkerContext()).getMaxSupersteps();
+ }
+
+ /**
+ * Reads the teleportation probability from the configuration
+ * @return teleportation probability
+ */
+ protected double teleportationProbability() {
+ return ((RandomWalkWorkerContext) getWorkerContext())
+ .getTeleportationProbability();
+ }
+
+ /**
+ * Master compute associated with {@link RandomWalkVertex}. It handles
+ * dangling nodes.
+ */
+ public static class RandomWalkVertexMasterCompute extends
+ DefaultMasterCompute {
+ @Override
+ public void compute() {
+ // TODO This is a good place to implement halting by checking convergence.
+ double danglingContribution =
+ this.<DoubleWritable>getAggregatedValue(RandomWalkVertex.DANGLING)
+ .get();
+ LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
+ danglingContribution);
+ }
+
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(RandomWalkVertex.DANGLING, DoubleSumAggregator.class);
+ }
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.utils.MathUtils;
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * Executes "RandomWalkWithRestart", a random walk on the graph which is biased
+ * towards a source vertex. The resulting probabilities of staying at a given
+ * vertex can be interpreted as a measure of proximity to the source vertex.
+ */
+public class RandomWalkWithRestartVertex extends RandomWalkVertex {
+
+ /** Configuration parameter for the source vertex */
+ static final String SOURCE_VERTEX = RandomWalkWithRestartVertex.class
+ .getName() + ".sourceVertex";
+
+ /**
+ * Checks whether the currently executed vertex is the source vertex
+ * @return is the currently executed vertex the source vertex?
+ */
+ private boolean isSourceVertex() {
+ return ((RandomWalkWorkerContext) getWorkerContext()).isSource(getId()
+ .get());
+ }
+
+ /**
+ * Returns the number of source vertexes.
+ * @return The number of source vertexes.
+ */
+ private int numSourceVertexes() {
+ return ((RandomWalkWorkerContext) getWorkerContext()).numSources();
+ }
+
+ /**
+ * Returns the cumulated probability from dangling nodes.
+ * @return The cumulated probability from dangling nodes.
+ */
+ private double getDanglingProbability() {
+ return this.<DoubleWritable>getAggregatedValue(RandomWalkVertex.DANGLING)
+ .get();
+ }
+
+ /**
+ * Start with a uniform distribution.
+ * @return A uniform probability over all the vertexces.
+ */
+ @Override
+ protected double initialProbability() {
+ return 1.0 / getTotalNumVertices();
+ }
+
+ @Override
+ protected double recompute(Iterable<DoubleWritable> transitionProbabilities,
+ double teleportationProbability) {
+ double stateProbability = MathUtils.sum(transitionProbabilities);
+ // Add the contribution of dangling nodes (weakly preferential
+ // implementation: dangling nodes redistribute uniformly)
+ stateProbability += getDanglingProbability() / getTotalNumVertices();
+ // The random walk might teleport back to one of the source vertexes
+ stateProbability *= 1 - teleportationProbability;
+ if (isSourceVertex()) {
+ stateProbability += teleportationProbability / numSourceVertexes();
+ }
+ return stateProbability;
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Set;
+
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Worker context for random walks.
+ */
+public class RandomWalkWorkerContext extends WorkerContext {
+ /** Default maximum number of iterations */
+ private static final int DEFAULT_MAX_SUPERSTEPS = 30;
+ /** Default teleportation probability */
+ private static final float DEFAULT_TELEPORTATION_PROBABILITY = 0.15f;
+ /** Maximum number of iterations */
+ private static int MAX_SUPERSTEPS;
+ /** Teleportation probability */
+ private static double TELEPORTATION_PROBABILITY;
+ /** Preference vector */
+ private static Set<Long> SOURCES;
+
+ /** Configuration parameter for the source vertex */
+ private static final String SOURCE_VERTEX = RandomWalkWithRestartVertex.class
+ .getName() + ".sourceVertex";
+
+ /** Logger */
+ private static final Logger LOG = Logger
+ .getLogger(RandomWalkWorkerContext.class);
+
+ /**
+ * @return The maximum number of iterations to perform.
+ */
+ public int getMaxSupersteps() {
+ if (MAX_SUPERSTEPS == 0) {
+ throw new IllegalStateException(
+ RandomWalkWorkerContext.class.getSimpleName() +
+ " was not initialized. Realunch your job " +
+ "by setting the appropriate WorkerContext");
+ }
+ return MAX_SUPERSTEPS;
+ }
+
+ /**
+ * @return The teleportation probability.
+ */
+ public double getTeleportationProbability() {
+ if (TELEPORTATION_PROBABILITY == 0) {
+ throw new IllegalStateException(
+ RandomWalkWorkerContext.class.getSimpleName() +
+ " was not initialized. Realunch your job " +
+ "by setting the appropriate WorkerContext");
+ }
+ return TELEPORTATION_PROBABILITY;
+ }
+
+ /**
+ * Checks if a vertex is a source.
+ * @param id The vertex ID to check.
+ * @return True if the vertex is a source in the preference vector.
+ */
+ public boolean isSource(long id) {
+ return SOURCES.contains(id);
+ }
+
+ /**
+ * @return The number of sources in the preference vector.
+ */
+ public int numSources() {
+ return SOURCES.size();
+ }
+
+ /**
+ * Initialize sources for Random Walk with Restart. First option
+ * (preferential) is single source given from the command line as a parameter.
+ * Second option is a file with a list of vertex IDs, one per line. In this
+ * second case the preference vector is a uniform distribution over these
+ * vertexes.
+ * @param configuration
+ * The configuration.
+ */
+ private void initializeSources(Configuration configuration) {
+ ImmutableSet.Builder<Long> builder = ImmutableSet.builder();
+ long sourceVertex = configuration.getLong(SOURCE_VERTEX, Long.MIN_VALUE);
+ if (sourceVertex != Long.MIN_VALUE) {
+ builder.add(sourceVertex);
+ } else {
+ Path sourceFile = null;
+ try {
+ sourceFile = DistributedCache.getLocalCacheFiles(configuration)[0];
+ FileSystem fs = FileSystem.getLocal(configuration);
+ BufferedReader in = new BufferedReader(new InputStreamReader(
+ fs.open(sourceFile)));
+ String line;
+ while ((line = in.readLine()) != null) {
+ builder.add(Long.parseLong(line));
+ }
+ in.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ getContext().setStatus(
+ "Could not load local cache files: " + sourceFile);
+ LOG.error("Could not load local cache files: " + sourceFile);
+ }
+ }
+ SOURCES = builder.build();
+ }
+
+ @Override
+ public void preApplication() throws InstantiationException,
+ IllegalAccessException {
+ Configuration configuration = this.getContext().getConfiguration();
+ MAX_SUPERSTEPS = configuration.getInt(RandomWalkVertex.MAX_SUPERSTEPS,
+ DEFAULT_MAX_SUPERSTEPS);
+ TELEPORTATION_PROBABILITY = configuration.getFloat(
+ RandomWalkVertex.TELEPORTATION_PROBABILITY,
+ DEFAULT_TELEPORTATION_PROBABILITY);
+ initializeSources(configuration);
+ }
+
+ @Override
+ public void preSuperstep() {
+ }
+
+ @Override
+ public void postSuperstep() {
+ }
+
+ @Override
+ public void postApplication() {
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.giraph.io.TextVertexOutputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Simple vertex output format for weighted graphs.
+ */
+public class VertexWithDoubleValueFloatEdgeTextOutputFormat extends
+ TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
+ @Override
+ public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
+ createVertexWriter(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ RecordWriter<Text, Text> recordWriter =
+ textOutputFormat.getRecordWriter(context);
+ return new VertexWithDoubleValueWriter(recordWriter);
+ }
+
+ /**
+ * Vertex writer used with {@link VertexWithComponentTextOutputFormat}.
+ */
+ public static class VertexWithDoubleValueWriter extends
+ TextVertexOutputFormat.TextVertexWriter<LongWritable,
+ DoubleWritable, FloatWritable> {
+ /**
+ * Constructor with record writer.
+ * @param writer Where the vertices will finally be written.
+ */
+ public VertexWithDoubleValueWriter(RecordWriter<Text, Text> writer) {
+ super(writer);
+ }
+
+ @Override
+ public void writeVertex(
+ Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
+ throws IOException, InterruptedException {
+ StringBuilder output = new StringBuilder();
+ output.append(vertex.getId().get());
+ output.append('\t');
+ output.append(vertex.getValue().get());
+ getRecordWriter().write(new Text(output.toString()), null);
+ }
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.utils.UnmodifiableDoubleArrayIterator;
+import org.apache.giraph.utils.UnmodifiableLongFloatEdgeArrayIterable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Compact vertex representation with primitive arrays.
+ */
+public abstract class LongDoubleFloatDoubleEdgeListVertex extends
+ Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+ /** long represented vertex id */
+ private long id;
+ /** double represented vertex value */
+ private double value;
+ /** long array of neighbor vertex IDs */
+ private long[] neighbors;
+ /** float array of edge weights */
+ private float[] edgeWeights;
+ /** double array of messages */
+ private double[] messages;
+
+ @Override
+ public void initialize(LongWritable vertexId, DoubleWritable vertexValue,
+ Map<LongWritable, FloatWritable> edges,
+ Iterable<DoubleWritable> messages) {
+ if (vertexId != null) {
+ id = vertexId.get();
+ }
+ if (vertexValue != null) {
+ value = vertexValue.get();
+ }
+ neighbors = new long[(edges != null) ? edges.size() : 0];
+ edgeWeights = new float[(edges != null) ? edges.size() : 0];
+ if (edges != null) {
+ int n = 0;
+ for (Entry<LongWritable, FloatWritable> neighbor : edges.entrySet()) {
+ neighbors[n] = neighbor.getKey().get();
+ edgeWeights[n] = neighbor.getValue().get();
+ n++;
+ }
+ }
+ this.messages =
+ new double[(messages != null) ? Iterables.size(messages) : 0];
+ if (messages != null) {
+ int n = 0;
+ for (DoubleWritable message : messages) {
+ this.messages[n++] = message.get();
+ }
+ }
+ }
+
+ @Override
+ public LongWritable getId() {
+ return new LongWritable(id);
+ }
+
+ @Override
+ public DoubleWritable getValue() {
+ return new DoubleWritable(value);
+ }
+
+ @Override
+ public void setValue(DoubleWritable vertexValue) {
+ value = vertexValue.get();
+ }
+
+ @Override
+ public Iterable<Edge<LongWritable, FloatWritable>> getEdges() {
+ return new UnmodifiableLongFloatEdgeArrayIterable(neighbors, edgeWeights);
+ }
+
+ @Override
+ public FloatWritable getEdgeValue(LongWritable targetVertexId) {
+ int idx = 0;
+ for (long neighbor : neighbors) {
+ if (neighbor == targetVertexId.get()) {
+ return new FloatWritable(edgeWeights[idx]);
+ }
+ idx++;
+ }
+ return null;
+ }
+
+ @Override
+ public boolean hasEdge(LongWritable targetVertexId) {
+ for (long neighbor : neighbors) {
+ if (neighbor == targetVertexId.get()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int getNumEdges() {
+ return neighbors.length;
+ }
+
+ @Override
+ public void sendMessageToAllEdges(final DoubleWritable message) {
+ for (long neighbor : neighbors) {
+ sendMessage(new LongWritable(neighbor), message);
+ }
+ }
+
+ @Override
+ public Iterable<DoubleWritable> getMessages() {
+ return new Iterable<DoubleWritable>() {
+ @Override
+ public Iterator<DoubleWritable> iterator() {
+ return new UnmodifiableDoubleArrayIterator(messages);
+ }
+ };
+ }
+
+ @Override
+ public void putMessages(Iterable<DoubleWritable> newMessages) {
+ messages = new double[Iterables.size(newMessages)];
+ int n = 0;
+ for (DoubleWritable message : newMessages) {
+ messages[n++] = message.get();
+ }
+ }
+
+ @Override
+ void releaseResources() {
+ messages = new double[0];
+ }
+
+ @Override
+ public void write(final DataOutput out) throws IOException {
+ out.writeLong(id);
+ out.writeDouble(value);
+ out.writeInt(neighbors.length);
+ for (int n = 0; n < neighbors.length; n++) {
+ out.writeLong(neighbors[n]);
+ }
+ for (int n = 0; n < edgeWeights.length; n++) {
+ out.writeFloat(edgeWeights[n]);
+ }
+ out.writeInt(messages.length);
+ for (int n = 0; n < messages.length; n++) {
+ out.writeDouble(messages[n]);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readLong();
+ value = in.readDouble();
+ int numEdges = in.readInt();
+ neighbors = new long[numEdges];
+ for (int n = 0; n < numEdges; n++) {
+ neighbors[n] = in.readLong();
+ }
+ edgeWeights = new float[numEdges];
+ for (int n = 0; n < numEdges; n++) {
+ edgeWeights[n] = in.readFloat();
+ }
+ int numMessages = in.readInt();
+ messages = new double[numMessages];
+ for (int n = 0; n < numMessages; n++) {
+ messages[n] = in.readDouble();
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (id ^ (id >>> 32));
+ return result;
+ }
+
+ @Override
+ public int getNumMessages() {
+ return messages.length;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof LongDoubleFloatDoubleEdgeListVertex)) {
+ return false;
+ }
+ LongDoubleFloatDoubleEdgeListVertex other =
+ (LongDoubleFloatDoubleEdgeListVertex) obj;
+ if (id != other.id) {
+ return false;
+ }
+ return true;
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.giraph.utils.UnmodifiableDoubleArrayIterator;
+import org.apache.giraph.utils.UnmodifiableLongNullEdgeArrayIterable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Compact vertex representation with primitive arrays and null edges.
+ */
+public abstract class LongDoubleNullDoubleVertex extends
+ Vertex<LongWritable, DoubleWritable, NullWritable, DoubleWritable> {
+
+ /** long represented vertex id */
+ private long id;
+ /** double represented vertex value */
+ private double value;
+ /** long array of neighbor vertex ids */
+ private long[] neighbors;
+ /** double array of messages */
+ private double[] messages;
+
+ @Override
+ public void initialize(LongWritable vertexId, DoubleWritable vertexValue,
+ Map<LongWritable, NullWritable> edges,
+ Iterable<DoubleWritable> messages) {
+ id = vertexId.get();
+ value = vertexValue.get();
+ neighbors = new long[(edges != null) ? edges.size() : 0];
+ int n = 0;
+ if (edges != null) {
+ for (LongWritable neighbor : edges.keySet()) {
+ neighbors[n++] = neighbor.get();
+ }
+ }
+ this.messages =
+ new double[(messages != null) ? Iterables.size(messages) : 0];
+ if (messages != null) {
+ n = 0;
+ for (DoubleWritable message : messages) {
+ this.messages[n++] = message.get();
+ }
+ }
+ }
+
+ @Override
+ public LongWritable getId() {
+ return new LongWritable(id);
+ }
+
+ @Override
+ public DoubleWritable getValue() {
+ return new DoubleWritable(value);
+ }
+
+ @Override
+ public void setValue(DoubleWritable vertexValue) {
+ value = vertexValue.get();
+ }
+
+ @Override
+ public Iterable<Edge<LongWritable, NullWritable>> getEdges() {
+ return new UnmodifiableLongNullEdgeArrayIterable(neighbors);
+ }
+
+ @Override
+ public NullWritable getEdgeValue(LongWritable targetVertexId) {
+ return NullWritable.get();
+ }
+
+ @Override
+ public boolean hasEdge(LongWritable targetVertexId) {
+ for (long neighbor : neighbors) {
+ if (neighbor == targetVertexId.get()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int getNumEdges() {
+ return neighbors.length;
+ }
+
+ @Override
+ public void sendMessageToAllEdges(final DoubleWritable message) {
+ for (long neighbor : neighbors) {
+ sendMessage(new LongWritable(neighbor), message);
+ }
+ }
+
+ @Override
+ public Iterable<DoubleWritable> getMessages() {
+ return new Iterable<DoubleWritable>() {
+ @Override
+ public Iterator<DoubleWritable> iterator() {
+ return new UnmodifiableDoubleArrayIterator(messages);
+ }
+ };
+ }
+
+ @Override
+ public void putMessages(Iterable<DoubleWritable> newMessages) {
+ messages = new double[Iterables.size(newMessages)];
+ int n = 0;
+ for (DoubleWritable message : newMessages) {
+ messages[n++] = message.get();
+ }
+ }
+
+ @Override
+ void releaseResources() {
+ messages = new double[0];
+ }
+
+ @Override
+ public void write(final DataOutput out) throws IOException {
+ out.writeLong(id);
+ out.writeDouble(value);
+ out.writeInt(neighbors.length);
+ for (int n = 0; n < neighbors.length; n++) {
+ out.writeLong(neighbors[n]);
+ }
+ out.writeInt(messages.length);
+ for (int n = 0; n < messages.length; n++) {
+ out.writeDouble(messages[n]);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readLong();
+ value = in.readDouble();
+ int numEdges = in.readInt();
+ neighbors = new long[numEdges];
+ for (int n = 0; n < numEdges; n++) {
+ neighbors[n] = in.readLong();
+ }
+ int numMessages = in.readInt();
+ messages = new double[numMessages];
+ for (int n = 0; n < numMessages; n++) {
+ messages[n] = in.readDouble();
+ }
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1383115&r1=1383114&r2=1383115&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java Mon Sep 10 21:32:14 2012
@@ -18,9 +18,20 @@
package org.apache.giraph.utils;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.MasterCompute;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.WorkerContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -29,12 +40,8 @@ import org.apache.zookeeper.server.Serve
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
/**
* A base class for running internal tests on a vertex
@@ -68,8 +75,11 @@ public class InternalVertexRunner {
* @return linewise output data
* @throws Exception
*/
- public static Iterable<String> run(Class<?> vertexClass,
- Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
+ @SuppressWarnings("rawtypes")
+ public static Iterable<String> run(
+ Class<? extends Vertex> vertexClass,
+ Class<? extends VertexInputFormat> vertexInputFormatClass,
+ Class<? extends VertexOutputFormat> vertexOutputFormatClass,
Map<String, String> params, String... data) throws Exception {
return run(vertexClass, null, vertexInputFormatClass,
vertexOutputFormatClass, params, data);
@@ -89,10 +99,42 @@ public class InternalVertexRunner {
* @return linewise output data
* @throws Exception
*/
- public static Iterable<String> run(Class<?> vertexClass,
- Class<?> vertexCombinerClass, Class<?> vertexInputFormatClass,
- Class<?> vertexOutputFormatClass, Map<String, String> params,
+ @SuppressWarnings("rawtypes")
+ public static Iterable<String> run(
+ Class<? extends Vertex> vertexClass,
+ Class<? extends VertexCombiner> vertexCombinerClass,
+ Class<? extends VertexInputFormat> vertexInputFormatClass,
+ Class<? extends VertexOutputFormat> vertexOutputFormatClass,
+ Map<String, String> params,
String... data) throws Exception {
+ return InternalVertexRunner.run(vertexClass, vertexCombinerClass,
+ vertexInputFormatClass, vertexOutputFormatClass, null, null, params,
+ data);
+ }
+
+ /**
+ * Attempts to run the vertex internally in the current JVM, reading from and
+ * writing to a temporary folder on local disk. Will start its own zookeeper
+ * instance.
+ * @param vertexClass the vertex class to instantiate
+ * @param vertexCombinerClass the vertex combiner to use (or null)
+ * @param vertexInputFormatClass the inputformat to use
+ * @param vertexOutputFormatClass the outputformat to use
+ * @param workerContextClass the worker context to use
+ * @param masterComputeClass the master compute class to use
+ * @param params a map of parameters to add to the hadoop configuration
+ * @param data linewise input data
+ * @return linewise output data
+ * @throws Exception
+ */
+ @SuppressWarnings("rawtypes")
+ public static Iterable<String> run(Class<? extends Vertex> vertexClass,
+ Class<? extends VertexCombiner> vertexCombinerClass,
+ Class<? extends VertexInputFormat> vertexInputFormatClass,
+ Class<? extends VertexOutputFormat> vertexOutputFormatClass,
+ Class<? extends WorkerContext> workerContextClass,
+ Class<? extends MasterCompute> masterComputeClass,
+ Map<String, String> params, String... data) throws Exception {
File tmpDir = null;
try {
@@ -112,10 +154,15 @@ public class InternalVertexRunner {
job.setVertexClass(vertexClass);
job.setVertexInputFormatClass(vertexInputFormatClass);
job.setVertexOutputFormatClass(vertexOutputFormatClass);
-
+ if (workerContextClass != null) {
+ job.setWorkerContextClass(workerContextClass);
+ }
if (vertexCombinerClass != null) {
job.setVertexCombinerClass(vertexCombinerClass);
}
+ if (masterComputeClass != null) {
+ job.setMasterComputeClass(masterComputeClass);
+ }
job.setWorkerConfiguration(1, 1, 100.0f);
Configuration conf = job.getConfiguration();
@@ -184,8 +231,6 @@ public class InternalVertexRunner {
}
}
-
-
/**
* Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
*/
Added: giraph/trunk/src/main/java/org/apache/giraph/utils/MathUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/MathUtils.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/MathUtils.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/MathUtils.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+import java.util.Iterator;
+
+/**
+ * A helper class for math related operations with writables
+ */
+public class MathUtils {
+
+ /**
+ * Utility classes cannot be instantiated
+ */
+ private MathUtils() {
+ }
+
+ /**
+ * Sums up a sequence of double values
+ * @param values double values
+ * @return sum of double values
+ */
+ public static double sum(Iterable<DoubleWritable> values) {
+ return sum(values.iterator());
+ }
+
+ /**
+ * Sums up a sequence of double values
+ * @param values double values
+ * @return sum of double values
+ */
+ public static double sum(Iterator<DoubleWritable> values) {
+ double sum = 0;
+ while (values.hasNext()) {
+ sum += values.next().get();
+ }
+ return sum;
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * {@link UnmodifiableIterator} over a primitive double array
+ */
+public class UnmodifiableDoubleArrayIterator extends
+ UnmodifiableIterator<DoubleWritable> {
+ /** Array to iterate over */
+ private final double[] doubleArray;
+ /** Offset to array */
+ private int offset;
+
+ /**
+ * Constructor with array to iterate over.
+ * @param doubleArray Array to iterate over.
+ */
+ public UnmodifiableDoubleArrayIterator(double[] doubleArray) {
+ this.doubleArray = doubleArray;
+ offset = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return offset < doubleArray.length;
+ }
+
+ @Override
+ public DoubleWritable next() {
+ return new DoubleWritable(doubleArray[offset++]);
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.LongWritable;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * {@link UnmodifiableIterator} over a primitive long array
+ */
+public class UnmodifiableLongArrayIterator extends
+ UnmodifiableIterator<LongWritable> {
+ /** Array to iterate over */
+ private final long[] longArray;
+ /** Offset to array */
+ private int offset;
+
+ /**
+ * Constructor with array to iterate over.
+ * @param longArray Array to iterate over.
+ */
+ public UnmodifiableLongArrayIterator(long[] longArray) {
+ this.longArray = longArray;
+ offset = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return offset < longArray.length;
+ }
+
+ @Override
+ public LongWritable next() {
+ return new LongWritable(longArray[offset++]);
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * {@link UnmodifiableIterator} over a pair of primitive long-float arrays.
+ */
+public class UnmodifiableLongFloatEdgeArrayIterable extends
+ UnmodifiableIterator<Edge<LongWritable, FloatWritable>> implements
+ Iterable<Edge<LongWritable, FloatWritable>> {
+ /** Array of IDs to iterate over */
+ private final long[] longArray;
+ /** Arrays of weights iterate over */
+ private final float[] floatArray;
+ /** Offset to array */
+ private int offset;
+
+ /**
+ * Constructor with arrays to iterate over.
+ * @param longArray Array of IDs to iterate over.
+ * @param floatArray Array of weights to iterate over.
+ */
+ public UnmodifiableLongFloatEdgeArrayIterable(final long[] longArray,
+ final float[] floatArray) {
+ this.longArray = longArray;
+ this.floatArray = floatArray;
+ offset = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return offset < longArray.length;
+ }
+
+ @Override
+ public Edge<LongWritable, FloatWritable> next() {
+ Edge<LongWritable, FloatWritable> retval =
+ new Edge<LongWritable, FloatWritable>(new LongWritable(
+ longArray[offset]), new FloatWritable(floatArray[offset]));
+ offset++;
+ return retval;
+ }
+
+ @Override
+ public Iterator<Edge<LongWritable, FloatWritable>> iterator() {
+ return this;
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * {@link UnmodifiableIterator} over a primitive long array with NullWritable
+ * edges.
+ */
+public class UnmodifiableLongNullEdgeArrayIterable extends
+ UnmodifiableIterator<Edge<LongWritable, NullWritable>> implements
+ Iterable<Edge<LongWritable, NullWritable>> {
+ /** Arrays to iterate over */
+ private final long[] longArray;
+ /** Offset to array */
+ private int offset;
+
+ /**
+ * Constructor with array to iterate over.
+ * @param longArray Array to iterate over.
+ */
+ public UnmodifiableLongNullEdgeArrayIterable(final long[] longArray) {
+ this.longArray = longArray;
+ offset = 0;
+ }
+
+ @Override
+ public Iterator<Edge<LongWritable, NullWritable>> iterator() {
+ return this;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return offset < longArray.length;
+ }
+
+ @Override
+ public Edge<LongWritable, NullWritable> next() {
+ Edge<LongWritable, NullWritable> retval =
+ new Edge<LongWritable, NullWritable>(
+ new LongWritable(longArray[offset]), NullWritable.get());
+ offset++;
+ return retval;
+ }
+}
Added: giraph/trunk/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java (added)
+++ giraph/trunk/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+
+import org.apache.giraph.examples.RandomWalkVertex.
+ RandomWalkVertexMasterCompute;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Tests for {@link RandomWalkWithRestartVertex}
+ */
+public class RandomWalkWithRestartVertexTest {
+
+ /** Minimum difference between doubles */
+ private static final double EPSILON = 10e-3;
+
+ /**
+ * A local integration test on toy data
+ */
+ @Test
+ public void testToyData() throws Exception {
+
+ // A small graph
+ String[] graph = new String[] { "12 34 56", "34 78", "56 34 78", "78 34" };
+
+ Map<String, String> params = Maps.newHashMap();
+ params.put(RandomWalkWithRestartVertex.SOURCE_VERTEX, "12");
+ params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, "30");
+ params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, "0.25");
+
+ // Run internally
+ Iterable<String> results =
+ InternalVertexRunner.run(RandomWalkWithRestartVertex.class, null,
+ LongDoubleFloatDoubleTextInputFormat.class,
+ VertexWithDoubleValueFloatEdgeTextOutputFormat.class,
+ RandomWalkWorkerContext.class, RandomWalkVertexMasterCompute.class,
+ params, graph);
+
+ Map<Long, Double> steadyStateProbabilities =
+ parseSteadyStateProbabilities(results);
+ // values computed with external software
+ // 0.25, 0.354872, 0.09375, 0.301377
+ assertEquals(0.25, steadyStateProbabilities.get(12L), EPSILON);
+ assertEquals(0.354872, steadyStateProbabilities.get(34L), EPSILON);
+ assertEquals(0.09375, steadyStateProbabilities.get(56L), EPSILON);
+ assertEquals(0.301377, steadyStateProbabilities.get(78L), EPSILON);
+ }
+
+ /**
+ * A local integration test on toy data
+ */
+ @Test
+ public void testWeightedGraph() throws Exception {
+ // A small graph
+ String[] graph =
+ new String[] { "12 34:0.1 56:0.9", "34 78:0.9 56:0.1",
+ "56 12:0.1 34:0.8 78:0.1", "78 34:1.0" };
+
+ Map<String, String> params = Maps.newHashMap();
+ params.put(RandomWalkWithRestartVertex.SOURCE_VERTEX, "12");
+ params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, "30");
+ params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, "0.15");
+
+ // Run internally
+ Iterable<String> results =
+ InternalVertexRunner.run(RandomWalkWithRestartVertex.class, null,
+ NormalizingLongDoubleFloatDoubleTextInputFormat.class,
+ VertexWithDoubleValueFloatEdgeTextOutputFormat.class,
+ RandomWalkWorkerContext.class, RandomWalkVertexMasterCompute.class,
+ params, graph);
+
+ Map<Long, Double> steadyStateProbabilities =
+ parseSteadyStateProbabilities(results);
+ // values computed with external software
+ // 0.163365, 0.378932, 0.156886, 0.300816
+ assertEquals(0.163365, steadyStateProbabilities.get(12L), EPSILON);
+ assertEquals(0.378932, steadyStateProbabilities.get(34L), EPSILON);
+ assertEquals(0.156886, steadyStateProbabilities.get(56L), EPSILON);
+ assertEquals(0.300816, steadyStateProbabilities.get(78L), EPSILON);
+ }
+
+ /**
+ * Parse steady state probabilities.
+ * @param results The steady state probabilities in text format.
+ * @return A map representation of the steady state probabilities.
+ */
+ private Map<Long, Double> parseSteadyStateProbabilities(
+ Iterable<String> results) {
+ Map<Long, Double> result = Maps.newHashMap();
+ for (String s : results) {
+ String[] tokens = s.split("\\t");
+ Long id = Long.parseLong(tokens[0]);
+ Double value = Double.parseDouble(tokens[1]);
+ result.put(id, value);
+ }
+ return result;
+ }
+}
|