giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ereis...@apache.org
Subject svn commit: r1383115 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/utils/ src/test/java/org/apache/giraph/examples/
Date Mon, 10 Sep 2012 21:32:15 GMT
Author: ereisman
Date: Mon Sep 10 21:32:14 2012
New Revision: 1383115

URL: http://svn.apache.org/viewvc?rev=1383115&view=rev
Log:
GIRAPH-191: Random Walks On Graphs (Gianmarco De Francisci Morales via ereisman)

Added:
    giraph/trunk/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/MathUtils.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java
    giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java
    giraph/trunk/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/checkstyle.xml
    giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1383115&r1=1383114&r2=1383115&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Mon Sep 10 21:32:14 2012
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-191: Random walks on graphs (Gianmarco De Francisci Morales via ereisman)
+
   GIRAPH-320: Provide a runtime configuration for choosing the
   log level (aching via ereisman)
 

Modified: giraph/trunk/checkstyle.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/checkstyle.xml?rev=1383115&r1=1383114&r2=1383115&view=diff
==============================================================================
--- giraph/trunk/checkstyle.xml (original)
+++ giraph/trunk/checkstyle.xml Mon Sep 10 21:32:14 2012
@@ -227,7 +227,9 @@
     <module name="MethodLength">
       <property name="max" value="200"/>
     </module>
-    <module name="ParameterNumber"/>
+    <module name="ParameterNumber">
+      <property name="max" value="8"/>
+    </module>
 
     <!-- Checks for whitespace (tree walker)                 -->
     <!-- See http://checkstyle.sf.net/config_whitespace.html -->

Added: giraph/trunk/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.utils.MathUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * A combiner that sums double-valued messages
+ */
+public class DoubleSumCombiner extends
+    VertexCombiner<LongWritable, DoubleWritable> {
+  @Override
+  public Iterable<DoubleWritable> combine(LongWritable vertexIndex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    return Collections.singleton(new DoubleWritable(MathUtils.sum(messages)));
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.io.TextVertexInputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
+ * unweighted graphs with long ids. Each line consists of: vertex neighbor1
+ * neighbor2 ...
+ */
+public class LongDoubleFloatDoubleTextInputFormat
+    extends
+    TextVertexInputFormat<LongWritable, DoubleWritable,
+    FloatWritable, DoubleWritable> {
+
+  @Override
+  public VertexReader<LongWritable,
+    DoubleWritable, FloatWritable, DoubleWritable>
+  createVertexReader(InputSplit split, TaskAttemptContext context)
+    throws IOException {
+    return new LongDoubleFloatDoubleVertexReader(
+        textInputFormat.createRecordReader(split, context));
+  }
+
+  /**
+   * Vertex reader associated with {@link LongDoubleFloatDoubleTextInputFormat}.
+   */
+  public static class LongDoubleFloatDoubleVertexReader extends
+    TextVertexInputFormat.TextVertexReader<LongWritable, DoubleWritable,
+    FloatWritable, DoubleWritable> {
+    /** Separator of the vertex and neighbors */
+    private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+    /**
+     * Constructor with the line reader.
+     * @param lineReader Internal line reader.
+     */
+    public LongDoubleFloatDoubleVertexReader(
+        RecordReader<LongWritable, Text> lineReader) {
+      super(lineReader);
+    }
+
+    @Override
+    public Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+    getCurrentVertex() throws IOException, InterruptedException {
+      Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        vertex = BspUtils.<LongWritable, DoubleWritable,
+        FloatWritable, DoubleWritable>createVertex(getContext()
+            .getConfiguration());
+
+      String[] tokens =
+          SEPARATOR.split(getRecordReader().getCurrentValue().toString());
+      Map<LongWritable, FloatWritable> edges =
+          Maps.newHashMapWithExpectedSize(tokens.length - 1);
+      float weight = 1.0f / (tokens.length - 1);
+      for (int n = 1; n < tokens.length; n++) {
+        edges.put(new LongWritable(Long.parseLong(tokens[n])),
+            new FloatWritable(weight));
+      }
+
+      LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
+      vertex.initialize(vertexId, new DoubleWritable(), edges,
+          Lists.<DoubleWritable>newArrayList());
+
+      return vertex;
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.io.TextVertexInputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
+ * unweighted graphs with long ids. Each line consists of: vertex
+ * neighbor1:weight1 neighbor2:weight2 ...
+ */
+public class NormalizingLongDoubleFloatDoubleTextInputFormat
+    extends
+    TextVertexInputFormat<LongWritable, DoubleWritable,
+      FloatWritable, DoubleWritable> {
+
+  @Override
+  public VertexReader<LongWritable, DoubleWritable,
+  FloatWritable, DoubleWritable> createVertexReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new NormalizingLongDoubleFloatDoubleVertexReader(
+        textInputFormat.createRecordReader(split, context));
+  }
+
+  /**
+   * Vertex reader associated with {@link LongDoubleFloatDoubleTextInputFormat}.
+   */
+  public static class NormalizingLongDoubleFloatDoubleVertexReader
+      extends
+      TextVertexInputFormat.TextVertexReader<LongWritable, DoubleWritable,
+      FloatWritable, DoubleWritable> {
+    /** Separator of the vertex and neighbors */
+    private static final Pattern EDGE_SEPARATOR = Pattern.compile("\\s+");
+    /** Separator of the edge id and edge weight */
+    private static final Pattern WEIGHT_SEPARATOR = Pattern.compile(":");
+
+    /**
+     * Constructor with the line reader.
+     * @param lineReader
+     *          Internal line reader.
+     */
+    public NormalizingLongDoubleFloatDoubleVertexReader(
+        RecordReader<LongWritable, Text> lineReader) {
+      super(lineReader);
+    }
+
+    @Override
+    public Vertex<LongWritable, DoubleWritable,
+    FloatWritable, DoubleWritable> getCurrentVertex()
+      throws IOException, InterruptedException {
+      Vertex<LongWritable, DoubleWritable,
+      FloatWritable, DoubleWritable> vertex = BspUtils
+          .<LongWritable, DoubleWritable,
+          FloatWritable, DoubleWritable>createVertex(getContext()
+              .getConfiguration());
+
+      String[] tokens = EDGE_SEPARATOR.split(getRecordReader()
+          .getCurrentValue().toString());
+      Map<LongWritable, FloatWritable> edges = Maps
+          .newHashMapWithExpectedSize(tokens.length - 1);
+      parse(tokens, edges);
+      normalize(edges);
+
+      LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
+      vertex.initialize(vertexId, new DoubleWritable(), edges,
+          Lists.<DoubleWritable>newArrayList());
+
+      return vertex;
+    }
+
+    /**
+     * Parse a set of tokens into a map ID -> weight.
+     * @param tokens The tokens to be parsed.
+     * @param edges The map that will contain the result of the parsing.
+     */
+    static void parse(String[] tokens, Map<LongWritable, FloatWritable> edges) {
+      for (int n = 1; n < tokens.length; n++) {
+        String[] parts = WEIGHT_SEPARATOR.split(tokens[n]);
+        edges.put(new LongWritable(Long.parseLong(parts[0])),
+            new FloatWritable(Float.parseFloat(parts[1])));
+      }
+    }
+
+    /**
+     * Normalize the edges with L1 normalization.
+     * @param edges The edges to be normalized.
+     */
+    static void normalize(Map<LongWritable, FloatWritable> edges) {
+      if (edges == null || edges.size() == 0) {
+        throw new IllegalArgumentException(
+            "Cannot normalize an empy set of edges");
+      }
+      float normalizer = 0.0f;
+      for (FloatWritable weight : edges.values()) {
+        normalizer += weight.get();
+      }
+      for (FloatWritable weight : edges.values()) {
+        weight.set(weight.get() / normalizer);
+      }
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.DoubleSumAggregator;
+import org.apache.giraph.graph.DefaultMasterCompute;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.LongDoubleFloatDoubleEdgeListVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+/**
+ * Base class for executing a random walk on the graph
+ */
+public abstract class RandomWalkVertex extends
+    LongDoubleFloatDoubleEdgeListVertex {
+  /** Configuration parameter for the number of supersteps to execute */
+  static final String MAX_SUPERSTEPS = RandomWalkVertex.class.getName() +
+      ".maxSupersteps";
+  /** Configuration parameter for the teleportation probability */
+  static final String TELEPORTATION_PROBABILITY = RandomWalkVertex.class
+      .getName() + ".teleportationProbability";
+  /** Name of aggregator for dangling nodes */
+  static final String DANGLING = "dangling";
+  /** Logger */
+  private static final Logger LOG = Logger.getLogger(RandomWalkVertex.class);
+  /** State probability of the vertex */
+  protected final DoubleWritable d = new DoubleWritable();
+
+  /**
+   * Compute an initial probability distribution for the vertex.
+   * @return The initial probability value.
+   */
+  protected abstract double initialProbability();
+
+  /**
+   * Perform a single step of a random walk computation.
+   * @param messages Messages received in the previous step.
+   * @param teleportationProbability Probability of teleporting to another
+   *          vertex.
+   * @return The new probability distribution value.
+   */
+  protected abstract double recompute(Iterable<DoubleWritable> messages,
+      double teleportationProbability);
+
+  @Override
+  public void compute(Iterable<DoubleWritable> messages) throws IOException {
+    double stateProbability;
+
+    if (getSuperstep() > 0) {
+      stateProbability = recompute(messages, teleportationProbability());
+    } else {
+      stateProbability = initialProbability();
+    }
+    d.set(stateProbability);
+    setValue(d);
+
+    // Compute dangling node contribution for next superstep
+    if (getNumEdges() == 0) {
+      aggregate(DANGLING, d);
+    }
+
+    // Execute the algorithm as often as configured,
+    // alternatively convergence could be checked via an Aggregator
+    if (getSuperstep() < maxSupersteps()) {
+      for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
+        double transitionProbability = stateProbability * edge.getValue().get();
+        sendMessage(edge.getTargetVertexId(), new DoubleWritable(
+            transitionProbability));
+      }
+    } else {
+      voteToHalt();
+    }
+  }
+
+  /**
+   * Reads the number of supersteps to execute from the configuration
+   * @return number of supersteps to execute
+   */
+  private int maxSupersteps() {
+    return ((RandomWalkWorkerContext) getWorkerContext()).getMaxSupersteps();
+  }
+
+  /**
+   * Reads the teleportation probability from the configuration
+   * @return teleportation probability
+   */
+  protected double teleportationProbability() {
+    return ((RandomWalkWorkerContext) getWorkerContext())
+        .getTeleportationProbability();
+  }
+
+  /**
+   * Master compute associated with {@link RandomWalkVertex}. It handles
+   * dangling nodes.
+   */
+  public static class RandomWalkVertexMasterCompute extends
+      DefaultMasterCompute {
+    @Override
+    public void compute() {
+      // TODO This is a good place to implement halting by checking convergence.
+      double danglingContribution =
+          this.<DoubleWritable>getAggregatedValue(RandomWalkVertex.DANGLING)
+              .get();
+      LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
+          danglingContribution);
+    }
+
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(RandomWalkVertex.DANGLING, DoubleSumAggregator.class);
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.utils.MathUtils;
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * Executes "RandomWalkWithRestart", a random walk on the graph which is biased
+ * towards a source vertex. The resulting probabilities of staying at a given
+ * vertex can be interpreted as a measure of proximity to the source vertex.
+ */
+public class RandomWalkWithRestartVertex extends RandomWalkVertex {
+
+  /** Configuration parameter for the source vertex */
+  static final String SOURCE_VERTEX = RandomWalkWithRestartVertex.class
+      .getName() + ".sourceVertex";
+
+  /**
+   * Checks whether the currently executed vertex is the source vertex
+   * @return is the currently executed vertex the source vertex?
+   */
+  private boolean isSourceVertex() {
+    return ((RandomWalkWorkerContext) getWorkerContext()).isSource(getId()
+        .get());
+  }
+
+  /**
+   * Returns the number of source vertexes.
+   * @return The number of source vertexes.
+   */
+  private int numSourceVertexes() {
+    return ((RandomWalkWorkerContext) getWorkerContext()).numSources();
+  }
+
+  /**
+   * Returns the cumulated probability from dangling nodes.
+   * @return The cumulated probability from dangling nodes.
+   */
+  private double getDanglingProbability() {
+    return this.<DoubleWritable>getAggregatedValue(RandomWalkVertex.DANGLING)
+        .get();
+  }
+
+  /**
+   * Start with a uniform distribution.
+   * @return A uniform probability over all the vertexces.
+   */
+  @Override
+  protected double initialProbability() {
+    return 1.0 / getTotalNumVertices();
+  }
+
+  @Override
+  protected double recompute(Iterable<DoubleWritable> transitionProbabilities,
+      double teleportationProbability) {
+    double stateProbability = MathUtils.sum(transitionProbabilities);
+    // Add the contribution of dangling nodes (weakly preferential
+    // implementation: dangling nodes redistribute uniformly)
+    stateProbability += getDanglingProbability() / getTotalNumVertices();
+    // The random walk might teleport back to one of the source vertexes
+    stateProbability *= 1 - teleportationProbability;
+    if (isSourceVertex()) {
+      stateProbability += teleportationProbability / numSourceVertexes();
+    }
+    return stateProbability;
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Set;
+
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Worker context for random walks.
+ */
+public class RandomWalkWorkerContext extends WorkerContext {
+  /** Default maximum number of iterations */
+  private static final int DEFAULT_MAX_SUPERSTEPS = 30;
+  /** Default teleportation probability */
+  private static final float DEFAULT_TELEPORTATION_PROBABILITY = 0.15f;
+  /** Maximum number of iterations */
+  private static int MAX_SUPERSTEPS;
+  /** Teleportation probability */
+  private static double TELEPORTATION_PROBABILITY;
+  /** Preference vector */
+  private static Set<Long> SOURCES;
+
+  /** Configuration parameter for the source vertex */
+  private static final String SOURCE_VERTEX = RandomWalkWithRestartVertex.class
+      .getName() + ".sourceVertex";
+
+  /** Logger */
+  private static final Logger LOG = Logger
+      .getLogger(RandomWalkWorkerContext.class);
+
+  /**
+   * @return The maximum number of iterations to perform.
+   */
+  public int getMaxSupersteps() {
+    if (MAX_SUPERSTEPS == 0) {
+      throw new IllegalStateException(
+          RandomWalkWorkerContext.class.getSimpleName() +
+              " was not initialized. Realunch your job " +
+              "by setting the appropriate WorkerContext");
+    }
+    return MAX_SUPERSTEPS;
+  }
+
+  /**
+   * @return The teleportation probability.
+   */
+  public double getTeleportationProbability() {
+    if (TELEPORTATION_PROBABILITY == 0) {
+      throw new IllegalStateException(
+          RandomWalkWorkerContext.class.getSimpleName() +
+              " was not initialized. Realunch your job " +
+              "by setting the appropriate WorkerContext");
+    }
+    return TELEPORTATION_PROBABILITY;
+  }
+
+  /**
+   * Checks if a vertex is a source.
+   * @param id The vertex ID to check.
+   * @return True if the vertex is a source in the preference vector.
+   */
+  public boolean isSource(long id) {
+    return SOURCES.contains(id);
+  }
+
+  /**
+   * @return The number of sources in the preference vector.
+   */
+  public int numSources() {
+    return SOURCES.size();
+  }
+
+  /**
+   * Initialize sources for Random Walk with Restart. First option
+   * (preferential) is single source given from the command line as a parameter.
+   * Second option is a file with a list of vertex IDs, one per line. In this
+   * second case the preference vector is a uniform distribution over these
+   * vertexes.
+   * @param configuration
+   *          The configuration.
+   */
+  private void initializeSources(Configuration configuration) {
+    ImmutableSet.Builder<Long> builder = ImmutableSet.builder();
+    long sourceVertex = configuration.getLong(SOURCE_VERTEX, Long.MIN_VALUE);
+    if (sourceVertex != Long.MIN_VALUE) {
+      builder.add(sourceVertex);
+    } else {
+      Path sourceFile = null;
+      try {
+        sourceFile = DistributedCache.getLocalCacheFiles(configuration)[0];
+        FileSystem fs = FileSystem.getLocal(configuration);
+        BufferedReader in = new BufferedReader(new InputStreamReader(
+            fs.open(sourceFile)));
+        String line;
+        while ((line = in.readLine()) != null) {
+          builder.add(Long.parseLong(line));
+        }
+        in.close();
+      } catch (IOException e) {
+        e.printStackTrace();
+        getContext().setStatus(
+            "Could not load local cache files: " + sourceFile);
+        LOG.error("Could not load local cache files: " + sourceFile);
+      }
+    }
+    SOURCES = builder.build();
+  }
+
+  @Override
+  public void preApplication() throws InstantiationException,
+      IllegalAccessException {
+    Configuration configuration = this.getContext().getConfiguration();
+    MAX_SUPERSTEPS = configuration.getInt(RandomWalkVertex.MAX_SUPERSTEPS,
+        DEFAULT_MAX_SUPERSTEPS);
+    TELEPORTATION_PROBABILITY = configuration.getFloat(
+        RandomWalkVertex.TELEPORTATION_PROBABILITY,
+        DEFAULT_TELEPORTATION_PROBABILITY);
+    initializeSources(configuration);
+  }
+
+  @Override
+  public void preSuperstep() {
+  }
+
+  @Override
+  public void postSuperstep() {
+  }
+
+  @Override
+  public void postApplication() {
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.giraph.io.TextVertexOutputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Simple vertex output format for weighted graphs.
+ */
+public class VertexWithDoubleValueFloatEdgeTextOutputFormat extends
+    TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
+  @Override
+  public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
+  createVertexWriter(TaskAttemptContext context) throws IOException,
+          InterruptedException {
+    RecordWriter<Text, Text> recordWriter =
+        textOutputFormat.getRecordWriter(context);
+    return new VertexWithDoubleValueWriter(recordWriter);
+  }
+
+  /**
+   * Vertex writer used with {@link VertexWithComponentTextOutputFormat}.
+   */
+  public static class VertexWithDoubleValueWriter extends
+    TextVertexOutputFormat.TextVertexWriter<LongWritable,
+    DoubleWritable, FloatWritable> {
+    /**
+     * Constructor with record writer.
+     * @param writer Where the vertices will finally be written.
+     */
+    public VertexWithDoubleValueWriter(RecordWriter<Text, Text> writer) {
+      super(writer);
+    }
+
+    @Override
+    public void writeVertex(
+        Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
+      throws IOException, InterruptedException {
+      StringBuilder output = new StringBuilder();
+      output.append(vertex.getId().get());
+      output.append('\t');
+      output.append(vertex.getValue().get());
+      getRecordWriter().write(new Text(output.toString()), null);
+    }
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.utils.UnmodifiableDoubleArrayIterator;
+import org.apache.giraph.utils.UnmodifiableLongFloatEdgeArrayIterable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Compact vertex representation with primitive arrays.
+ */
+public abstract class LongDoubleFloatDoubleEdgeListVertex extends
+    Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+
+  /** long represented vertex id */
+  private long id;
+  /** double represented vertex value */
+  private double value;
+  /** long array of neighbor vertex IDs */
+  private long[] neighbors;
+  /** float array of edge weights */
+  private float[] edgeWeights;
+  /** double array of messages */
+  private double[] messages;
+
+  @Override
+  public void initialize(LongWritable vertexId, DoubleWritable vertexValue,
+          Map<LongWritable, FloatWritable> edges,
+          Iterable<DoubleWritable> messages) {
+    if (vertexId != null) {
+      id = vertexId.get();
+    }
+    if (vertexValue != null) {
+      value = vertexValue.get();
+    }
+    neighbors = new long[(edges != null) ? edges.size() : 0];
+    edgeWeights = new float[(edges != null) ? edges.size() : 0];
+    if (edges != null) {
+      int n = 0;
+      for (Entry<LongWritable, FloatWritable> neighbor : edges.entrySet()) {
+        neighbors[n] = neighbor.getKey().get();
+        edgeWeights[n] = neighbor.getValue().get();
+        n++;
+      }
+    }
+    this.messages =
+        new double[(messages != null) ? Iterables.size(messages) : 0];
+    if (messages != null) {
+      int n = 0;
+      for (DoubleWritable message : messages) {
+        this.messages[n++] = message.get();
+      }
+    }
+  }
+
+  @Override
+  public LongWritable getId() {
+    return new LongWritable(id);
+  }
+
+  @Override
+  public DoubleWritable getValue() {
+    return new DoubleWritable(value);
+  }
+
+  @Override
+  public void setValue(DoubleWritable vertexValue) {
+    value = vertexValue.get();
+  }
+
+  @Override
+  public Iterable<Edge<LongWritable, FloatWritable>> getEdges() {
+    return new UnmodifiableLongFloatEdgeArrayIterable(neighbors, edgeWeights);
+  }
+
+  @Override
+  public FloatWritable getEdgeValue(LongWritable targetVertexId) {
+    int idx = 0;
+    for (long neighbor : neighbors) {
+      if (neighbor == targetVertexId.get()) {
+        return new FloatWritable(edgeWeights[idx]);
+      }
+      idx++;
+    }
+    return null;
+  }
+
+  @Override
+  public boolean hasEdge(LongWritable targetVertexId) {
+    for (long neighbor : neighbors) {
+      if (neighbor == targetVertexId.get()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int getNumEdges() {
+    return neighbors.length;
+  }
+
+  @Override
+  public void sendMessageToAllEdges(final DoubleWritable message) {
+    for (long neighbor : neighbors) {
+      sendMessage(new LongWritable(neighbor), message);
+    }
+  }
+
+  @Override
+  public Iterable<DoubleWritable> getMessages() {
+    return new Iterable<DoubleWritable>() {
+      @Override
+      public Iterator<DoubleWritable> iterator() {
+        return new UnmodifiableDoubleArrayIterator(messages);
+      }
+    };
+  }
+
+  @Override
+  public void putMessages(Iterable<DoubleWritable> newMessages) {
+    messages = new double[Iterables.size(newMessages)];
+    int n = 0;
+    for (DoubleWritable message : newMessages) {
+      messages[n++] = message.get();
+    }
+  }
+
+  @Override
+  void releaseResources() {
+    messages = new double[0];
+  }
+
+  @Override
+  public void write(final DataOutput out) throws IOException {
+    out.writeLong(id);
+    out.writeDouble(value);
+    out.writeInt(neighbors.length);
+    for (int n = 0; n < neighbors.length; n++) {
+      out.writeLong(neighbors[n]);
+    }
+    for (int n = 0; n < edgeWeights.length; n++) {
+      out.writeFloat(edgeWeights[n]);
+    }
+    out.writeInt(messages.length);
+    for (int n = 0; n < messages.length; n++) {
+      out.writeDouble(messages[n]);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    id = in.readLong();
+    value = in.readDouble();
+    int numEdges = in.readInt();
+    neighbors = new long[numEdges];
+    for (int n = 0; n < numEdges; n++) {
+      neighbors[n] = in.readLong();
+    }
+    edgeWeights = new float[numEdges];
+    for (int n = 0; n < numEdges; n++) {
+      edgeWeights[n] = in.readFloat();
+    }
+    int numMessages = in.readInt();
+    messages = new double[numMessages];
+    for (int n = 0; n < numMessages; n++) {
+      messages[n] = in.readDouble();
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (int) (id ^ (id >>> 32));
+    return result;
+  }
+
+  @Override
+  public int getNumMessages() {
+    return messages.length;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof LongDoubleFloatDoubleEdgeListVertex)) {
+      return false;
+    }
+    LongDoubleFloatDoubleEdgeListVertex other =
+        (LongDoubleFloatDoubleEdgeListVertex) obj;
+    if (id != other.id) {
+      return false;
+    }
+    return true;
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.giraph.utils.UnmodifiableDoubleArrayIterator;
+import org.apache.giraph.utils.UnmodifiableLongNullEdgeArrayIterable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Compact vertex representation with primitive arrays and null edges.
+ */
+public abstract class LongDoubleNullDoubleVertex extends
+    Vertex<LongWritable, DoubleWritable, NullWritable, DoubleWritable> {
+
+  /** long represented vertex id */
+  private long id;
+  /** double represented vertex value */
+  private double value;
+  /** long array of neighbor vertex ids */
+  private long[] neighbors;
+  /** double array of messages */
+  private double[] messages;
+
+  @Override
+  public void initialize(LongWritable vertexId, DoubleWritable vertexValue,
+      Map<LongWritable, NullWritable> edges,
+      Iterable<DoubleWritable> messages) {
+    id = vertexId.get();
+    value = vertexValue.get();
+    neighbors = new long[(edges != null) ? edges.size() : 0];
+    int n = 0;
+    if (edges != null) {
+      for (LongWritable neighbor : edges.keySet()) {
+        neighbors[n++] = neighbor.get();
+      }
+    }
+    this.messages =
+        new double[(messages != null) ? Iterables.size(messages) : 0];
+    if (messages != null) {
+      n = 0;
+      for (DoubleWritable message : messages) {
+        this.messages[n++] = message.get();
+      }
+    }
+  }
+
+  @Override
+  public LongWritable getId() {
+    return new LongWritable(id);
+  }
+
+  @Override
+  public DoubleWritable getValue() {
+    return new DoubleWritable(value);
+  }
+
+  @Override
+  public void setValue(DoubleWritable vertexValue) {
+    value = vertexValue.get();
+  }
+
+  @Override
+  public Iterable<Edge<LongWritable, NullWritable>> getEdges() {
+    return new UnmodifiableLongNullEdgeArrayIterable(neighbors);
+  }
+
+  @Override
+  public NullWritable getEdgeValue(LongWritable targetVertexId) {
+    return NullWritable.get();
+  }
+
+  @Override
+  public boolean hasEdge(LongWritable targetVertexId) {
+    for (long neighbor : neighbors) {
+      if (neighbor == targetVertexId.get()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int getNumEdges() {
+    return neighbors.length;
+  }
+
+  @Override
+  public void sendMessageToAllEdges(final DoubleWritable message) {
+    for (long neighbor : neighbors) {
+      sendMessage(new LongWritable(neighbor), message);
+    }
+  }
+
+  @Override
+  public Iterable<DoubleWritable> getMessages() {
+    return new Iterable<DoubleWritable>() {
+      @Override
+      public Iterator<DoubleWritable> iterator() {
+        return new UnmodifiableDoubleArrayIterator(messages);
+      }
+    };
+  }
+
+  @Override
+  public void putMessages(Iterable<DoubleWritable> newMessages) {
+    messages = new double[Iterables.size(newMessages)];
+    int n = 0;
+    for (DoubleWritable message : newMessages) {
+      messages[n++] = message.get();
+    }
+  }
+
+  @Override
+  void releaseResources() {
+    messages = new double[0];
+  }
+
+  @Override
+  public void write(final DataOutput out) throws IOException {
+    out.writeLong(id);
+    out.writeDouble(value);
+    out.writeInt(neighbors.length);
+    for (int n = 0; n < neighbors.length; n++) {
+      out.writeLong(neighbors[n]);
+    }
+    out.writeInt(messages.length);
+    for (int n = 0; n < messages.length; n++) {
+      out.writeDouble(messages[n]);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    id = in.readLong();
+    value = in.readDouble();
+    int numEdges = in.readInt();
+    neighbors = new long[numEdges];
+    for (int n = 0; n < numEdges; n++) {
+      neighbors[n] = in.readLong();
+    }
+    int numMessages = in.readInt();
+    messages = new double[numMessages];
+    for (int n = 0; n < numMessages; n++) {
+      messages[n] = in.readDouble();
+    }
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1383115&r1=1383114&r2=1383115&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java Mon Sep 10 21:32:14 2012
@@ -18,9 +18,20 @@
 
 package org.apache.giraph.utils;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.MasterCompute;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.WorkerContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -29,12 +40,8 @@ import org.apache.zookeeper.server.Serve
 import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
 
 /**
  * A base class for running internal tests on a vertex
@@ -68,8 +75,11 @@ public class InternalVertexRunner {
    * @return linewise output data
    * @throws Exception
    */
-  public static Iterable<String> run(Class<?> vertexClass,
-      Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
+  @SuppressWarnings("rawtypes")
+  public static Iterable<String> run(
+      Class<? extends Vertex> vertexClass,
+      Class<? extends VertexInputFormat> vertexInputFormatClass,
+      Class<? extends VertexOutputFormat> vertexOutputFormatClass,
       Map<String, String> params, String... data) throws Exception {
     return run(vertexClass, null, vertexInputFormatClass,
         vertexOutputFormatClass, params, data);
@@ -89,10 +99,42 @@ public class InternalVertexRunner {
    * @return linewise output data
    * @throws Exception
    */
-  public static Iterable<String> run(Class<?> vertexClass,
-      Class<?> vertexCombinerClass, Class<?> vertexInputFormatClass,
-      Class<?> vertexOutputFormatClass, Map<String, String> params,
+  @SuppressWarnings("rawtypes")
+  public static Iterable<String> run(
+      Class<? extends Vertex> vertexClass,
+      Class<? extends VertexCombiner> vertexCombinerClass,
+      Class<? extends VertexInputFormat> vertexInputFormatClass,
+      Class<? extends VertexOutputFormat> vertexOutputFormatClass,
+      Map<String, String> params,
       String... data) throws Exception {
+    return InternalVertexRunner.run(vertexClass, vertexCombinerClass,
+        vertexInputFormatClass, vertexOutputFormatClass, null, null, params,
+        data);
+  }
+
+  /**
+   * Attempts to run the vertex internally in the current JVM, reading from and
+   * writing to a temporary folder on local disk. Will start its own zookeeper
+   * instance.
+   * @param vertexClass the vertex class to instantiate
+   * @param vertexCombinerClass the vertex combiner to use (or null)
+   * @param vertexInputFormatClass the inputformat to use
+   * @param vertexOutputFormatClass the outputformat to use
+   * @param workerContextClass the worker context to use
+   * @param masterComputeClass the master compute class to use
+   * @param params a map of parameters to add to the hadoop configuration
+   * @param data linewise input data
+   * @return linewise output data
+   * @throws Exception
+   */
+  @SuppressWarnings("rawtypes")
+  public static Iterable<String> run(Class<? extends Vertex> vertexClass,
+      Class<? extends VertexCombiner> vertexCombinerClass,
+      Class<? extends VertexInputFormat> vertexInputFormatClass,
+      Class<? extends VertexOutputFormat> vertexOutputFormatClass,
+      Class<? extends WorkerContext> workerContextClass,
+      Class<? extends MasterCompute> masterComputeClass,
+      Map<String, String> params, String... data) throws Exception {
 
     File tmpDir = null;
     try {
@@ -112,10 +154,15 @@ public class InternalVertexRunner {
       job.setVertexClass(vertexClass);
       job.setVertexInputFormatClass(vertexInputFormatClass);
       job.setVertexOutputFormatClass(vertexOutputFormatClass);
-
+      if (workerContextClass != null) {
+        job.setWorkerContextClass(workerContextClass);
+      }
       if (vertexCombinerClass != null) {
         job.setVertexCombinerClass(vertexCombinerClass);
       }
+      if (masterComputeClass != null) {
+        job.setMasterComputeClass(masterComputeClass);
+      }
 
       job.setWorkerConfiguration(1, 1, 100.0f);
       Configuration conf = job.getConfiguration();
@@ -184,8 +231,6 @@ public class InternalVertexRunner {
     }
   }
 
-
-
   /**
    * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
    */

Added: giraph/trunk/src/main/java/org/apache/giraph/utils/MathUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/MathUtils.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/MathUtils.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/MathUtils.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+import java.util.Iterator;
+
+/**
+ * A helper class for math related operations with writables
+ */
+public class MathUtils {
+
+  /**
+   * Utility classes cannot be instantiated
+   */
+  private MathUtils() {
+  }
+
+  /**
+   * Sums up a sequence of double values
+   * @param values double values
+   * @return sum of double values
+   */
+  public static double sum(Iterable<DoubleWritable> values) {
+    return sum(values.iterator());
+  }
+
+  /**
+   * Sums up a sequence of double values
+   * @param values double values
+   * @return sum of double values
+   */
+  public static double sum(Iterator<DoubleWritable> values) {
+    double sum = 0;
+    while (values.hasNext()) {
+      sum += values.next().get();
+    }
+    return sum;
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * {@link UnmodifiableIterator} over a primitive double array
+ */
+public class UnmodifiableDoubleArrayIterator extends
+    UnmodifiableIterator<DoubleWritable> {
+  /** Array to iterate over */
+  private final double[] doubleArray;
+  /** Offset to array */
+  private int offset;
+
+  /**
+   * Constructor with array to iterate over.
+   * @param doubleArray Array to iterate over.
+   */
+  public UnmodifiableDoubleArrayIterator(double[] doubleArray) {
+    this.doubleArray = doubleArray;
+    offset = 0;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return offset < doubleArray.length;
+  }
+
+  @Override
+  public DoubleWritable next() {
+    return new DoubleWritable(doubleArray[offset++]);
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.LongWritable;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * {@link UnmodifiableIterator} over a primitive long array
+ */
+public class UnmodifiableLongArrayIterator extends
+    UnmodifiableIterator<LongWritable> {
+  /** Array to iterate over */
+  private final long[] longArray;
+  /** Offset to array */
+  private int offset;
+
+  /**
+   * Constructor with array to iterate over.
+   * @param longArray Array to iterate over.
+   */
+  public UnmodifiableLongArrayIterator(long[] longArray) {
+    this.longArray = longArray;
+    offset = 0;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return offset < longArray.length;
+  }
+
+  @Override
+  public LongWritable next() {
+    return new LongWritable(longArray[offset++]);
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * {@link UnmodifiableIterator} over a pair of primitive long-float arrays.
+ */
+public class UnmodifiableLongFloatEdgeArrayIterable extends
+    UnmodifiableIterator<Edge<LongWritable, FloatWritable>> implements
+    Iterable<Edge<LongWritable, FloatWritable>> {
+  /** Array of IDs to iterate over */
+  private final long[] longArray;
+  /** Arrays of weights iterate over */
+  private final float[] floatArray;
+  /** Offset to array */
+  private int offset;
+
+  /**
+   * Constructor with arrays to iterate over.
+   * @param longArray Array of IDs to iterate over.
+   * @param floatArray Array of weights to iterate over.
+   */
+  public UnmodifiableLongFloatEdgeArrayIterable(final long[] longArray,
+      final float[] floatArray) {
+    this.longArray = longArray;
+    this.floatArray = floatArray;
+    offset = 0;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return offset < longArray.length;
+  }
+
+  @Override
+  public Edge<LongWritable, FloatWritable> next() {
+    Edge<LongWritable, FloatWritable> retval =
+        new Edge<LongWritable, FloatWritable>(new LongWritable(
+            longArray[offset]), new FloatWritable(floatArray[offset]));
+    offset++;
+    return retval;
+  }
+
+  @Override
+  public Iterator<Edge<LongWritable, FloatWritable>> iterator() {
+    return this;
+  }
+}

Added: giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * {@link UnmodifiableIterator} over a primitive long array with NullWritable
+ * edges.
+ */
+public class UnmodifiableLongNullEdgeArrayIterable extends
+    UnmodifiableIterator<Edge<LongWritable, NullWritable>> implements
+    Iterable<Edge<LongWritable, NullWritable>> {
+  /** Arrays to iterate over */
+  private final long[] longArray;
+  /** Offset to array */
+  private int offset;
+
+  /**
+   * Constructor with array to iterate over.
+   * @param longArray Array to iterate over.
+   */
+  public UnmodifiableLongNullEdgeArrayIterable(final long[] longArray) {
+    this.longArray = longArray;
+    offset = 0;
+  }
+
+  @Override
+  public Iterator<Edge<LongWritable, NullWritable>> iterator() {
+    return this;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return offset < longArray.length;
+  }
+
+  @Override
+  public Edge<LongWritable, NullWritable> next() {
+    Edge<LongWritable, NullWritable> retval =
+        new Edge<LongWritable, NullWritable>(
+            new LongWritable(longArray[offset]), NullWritable.get());
+    offset++;
+    return retval;
+  }
+}

Added: giraph/trunk/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java?rev=1383115&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java (added)
+++ giraph/trunk/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java Mon Sep 10 21:32:14 2012
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+
+import org.apache.giraph.examples.RandomWalkVertex.
+  RandomWalkVertexMasterCompute;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Tests for {@link RandomWalkWithRestartVertex}
+ */
+public class RandomWalkWithRestartVertexTest {
+
+  /** Minimum difference between doubles */
+  private static final double EPSILON = 10e-3;
+
+  /**
+   * A local integration test on toy data
+   */
+  @Test
+  public void testToyData() throws Exception {
+
+    // A small graph
+    String[] graph = new String[] { "12 34 56", "34 78", "56 34 78", "78 34" };
+
+    Map<String, String> params = Maps.newHashMap();
+    params.put(RandomWalkWithRestartVertex.SOURCE_VERTEX, "12");
+    params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, "30");
+    params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, "0.25");
+
+    // Run internally
+    Iterable<String> results =
+        InternalVertexRunner.run(RandomWalkWithRestartVertex.class, null,
+            LongDoubleFloatDoubleTextInputFormat.class,
+            VertexWithDoubleValueFloatEdgeTextOutputFormat.class,
+            RandomWalkWorkerContext.class, RandomWalkVertexMasterCompute.class,
+            params, graph);
+
+    Map<Long, Double> steadyStateProbabilities =
+        parseSteadyStateProbabilities(results);
+    // values computed with external software
+    // 0.25, 0.354872, 0.09375, 0.301377
+    assertEquals(0.25, steadyStateProbabilities.get(12L), EPSILON);
+    assertEquals(0.354872, steadyStateProbabilities.get(34L), EPSILON);
+    assertEquals(0.09375, steadyStateProbabilities.get(56L), EPSILON);
+    assertEquals(0.301377, steadyStateProbabilities.get(78L), EPSILON);
+  }
+
+  /**
+   * A local integration test on toy data
+   */
+  @Test
+  public void testWeightedGraph() throws Exception {
+    // A small graph
+    String[] graph =
+        new String[] { "12 34:0.1 56:0.9", "34 78:0.9 56:0.1",
+          "56 12:0.1 34:0.8 78:0.1", "78 34:1.0" };
+
+    Map<String, String> params = Maps.newHashMap();
+    params.put(RandomWalkWithRestartVertex.SOURCE_VERTEX, "12");
+    params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, "30");
+    params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY, "0.15");
+
+    // Run internally
+    Iterable<String> results =
+        InternalVertexRunner.run(RandomWalkWithRestartVertex.class, null,
+            NormalizingLongDoubleFloatDoubleTextInputFormat.class,
+            VertexWithDoubleValueFloatEdgeTextOutputFormat.class,
+            RandomWalkWorkerContext.class, RandomWalkVertexMasterCompute.class,
+            params, graph);
+
+    Map<Long, Double> steadyStateProbabilities =
+        parseSteadyStateProbabilities(results);
+    // values computed with external software
+    // 0.163365, 0.378932, 0.156886, 0.300816
+    assertEquals(0.163365, steadyStateProbabilities.get(12L), EPSILON);
+    assertEquals(0.378932, steadyStateProbabilities.get(34L), EPSILON);
+    assertEquals(0.156886, steadyStateProbabilities.get(56L), EPSILON);
+    assertEquals(0.300816, steadyStateProbabilities.get(78L), EPSILON);
+  }
+
+  /**
+   * Parse steady state probabilities.
+   * @param results The steady state probabilities in text format.
+   * @return A map representation of the steady state probabilities.
+   */
+  private Map<Long, Double> parseSteadyStateProbabilities(
+      Iterable<String> results) {
+    Map<Long, Double> result = Maps.newHashMap();
+    for (String s : results) {
+      String[] tokens = s.split("\\t");
+      Long id = Long.parseLong(tokens[0]);
+      Double value = Double.parseDouble(tokens[1]);
+      result.put(id, value);
+    }
+    return result;
+  }
+}



Mime
View raw message