giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apre...@apache.org
Subject git commit: updated refs/heads/trunk to fa6e3d5
Date Tue, 06 Aug 2013 17:32:52 GMT
Updated Branches:
  refs/heads/trunk d419f8f4f -> fa6e3d5a8


GIRAPH-728: Efficient matrix aggregators (herald via apresta)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fa6e3d5a
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fa6e3d5a
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fa6e3d5a

Branch: refs/heads/trunk
Commit: fa6e3d5a8b25616e31cf355f2cb5a7001f4c8f64
Parents: d419f8f
Author: Alessandro Presta <alessandro@fb.com>
Authored: Tue Aug 6 10:31:59 2013 -0700
Committer: Alessandro Presta <alessandro@fb.com>
Committed: Tue Aug 6 10:31:59 2013 -0700

----------------------------------------------------------------------
 .../giraph/aggregators/matrix/DoubleMatrix.java | 104 ++++++++++++++++
 .../matrix/DoubleMatrixSumAggregator.java       | 101 +++++++++++++++
 .../giraph/aggregators/matrix/DoubleVector.java | 123 +++++++++++++++++++
 .../matrix/DoubleVectorSumAggregator.java       |  37 ++++++
 .../giraph/aggregators/matrix/FloatMatrix.java  | 104 ++++++++++++++++
 .../matrix/FloatMatrixSumAggregator.java        | 100 +++++++++++++++
 .../giraph/aggregators/matrix/FloatVector.java  | 123 +++++++++++++++++++
 .../matrix/FloatVectorSumAggregator.java        |  37 ++++++
 .../giraph/aggregators/matrix/IntMatrix.java    | 104 ++++++++++++++++
 .../matrix/IntMatrixSumAggregator.java          | 100 +++++++++++++++
 .../giraph/aggregators/matrix/IntVector.java    | 123 +++++++++++++++++++
 .../matrix/IntVectorSumAggregator.java          |  37 ++++++
 .../giraph/aggregators/matrix/LongMatrix.java   | 104 ++++++++++++++++
 .../matrix/LongMatrixSumAggregator.java         | 100 +++++++++++++++
 .../giraph/aggregators/matrix/LongVector.java   | 123 +++++++++++++++++++
 .../matrix/LongVectorSumAggregator.java         |  37 ++++++
 .../aggregators/matrix/MatrixSumAggregator.java |  51 ++++++++
 .../giraph/aggregators/matrix/package-info.java |  21 ++++
 .../aggregators/matrix/TestDoubleMatrix.java    |  74 +++++++++++
 .../aggregators/matrix/TestFloatMatrix.java     |  74 +++++++++++
 .../aggregators/matrix/TestIntMatrix.java       |  73 +++++++++++
 .../aggregators/matrix/TestLongMatrix.java      |  73 +++++++++++
 22 files changed, 1823 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrix.java
new file mode 100644
index 0000000..d86dc4b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrix.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+/**
+ * A double matrix holds the values of the entries in double vectors. It keeps
+ * one double aggregator per matrix row.
+ */
+public class DoubleMatrix {
+  /** The number of rows in the matrix */
+  private int numRows;
+  /** The rows of the matrix */
+  private Int2ObjectOpenHashMap<DoubleVector> rows;
+
+  /**
+   * Create a new matrix with the given number of rows.
+   *
+   * @param numRows the number of rows.
+   */
+  public DoubleMatrix(int numRows) {
+    this.numRows = numRows;
+    rows = new Int2ObjectOpenHashMap<DoubleVector>(numRows);
+    rows.defaultReturnValue(null);
+  }
+
+  /**
+   * Create a empty matrix with all values set to 0.0
+   */
+  public void initialize() {
+    rows.clear();
+    for (int i = 0; i < numRows; ++i) {
+      setRow(i, new DoubleVector());
+    }
+  }
+
+  /**
+   * Get the number of rows in the matrix.
+   *
+   * @return the number of rows.
+   */
+  public int getNumRows() {
+    return numRows;
+  }
+
+  /**
+   * Get a specific entry of the matrix.
+   *
+   * @param i the row
+   * @param j the column
+   * @return the value of the entry
+   */
+  public double get(int i, int j) {
+    return rows.get(i).get(j);
+  }
+
+  /**
+   * Set a specific entry of the matrix.
+   *
+   * @param i the row
+   * @param j the column
+   * @param v the value of the entry
+   */
+  public void set(int i, int j, double v) {
+    rows.get(i).set(j, v);
+  }
+
+  /**
+   * Get a specific row of the matrix.
+   *
+   * @param i the row number
+   * @return the row of the matrix
+   */
+  DoubleVector getRow(int i) {
+    return rows.get(i);
+  }
+
+  /**
+   * Set the double vector as the row specified.
+   *
+   * @param i the row
+   * @param vec the vector to set as the row
+   */
+  void setRow(int i, DoubleVector vec) {
+    rows.put(i, vec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrixSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrixSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrixSumAggregator.java
new file mode 100644
index 0000000..0a1dafb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrixSumAggregator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.AggregatorUsage;
+import org.apache.giraph.master.MasterAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+
+/**
+ * The double matrix aggregator is used to register and aggregate double
+ * matrices.
+ */
+public class DoubleMatrixSumAggregator extends MatrixSumAggregator {
+  /** sparse vector with single entry */
+  private DoubleVector singletonVector = new DoubleVector();
+
+  /**
+   * Create a new matrix aggregator with the given prefix name for the vector
+   * aggregators.
+   *
+   * @param name the prefix for the row vector aggregators
+   */
+  public DoubleMatrixSumAggregator(String name) {
+    super(name);
+  }
+
+  /**
+   * Register the double vector aggregators, one for each row of the matrix.
+   *
+   * @param numRows the number of rows
+   * @param master the master to register the aggregators
+   */
+  public void register(int numRows, MasterAggregatorUsage master)
+    throws InstantiationException, IllegalAccessException {
+    for (int i = 0; i < numRows; ++i) {
+      master.registerAggregator(getRowAggregatorName(i),
+          DoubleVectorSumAggregator.class);
+    }
+  }
+
+  /**
+   * Add the given value to the entry specified.
+   *
+   * @param i the row
+   * @param j the column
+   * @param v the value
+   * @param worker the worker to aggregate
+   */
+  public void aggregate(int i, int j, double v, WorkerAggregatorUsage worker) {
+    singletonVector.clear();
+    singletonVector.set(j, v);
+    worker.aggregate(getRowAggregatorName(i), singletonVector);
+  }
+
+  /**
+   * Set the values of the matrix to the master specified. This is typically
+   * used in the master, to build an external DoubleMatrix and only set it at
+   * the end.
+   *
+   * @param matrix the matrix to set the values
+   * @param master the master
+   */
+  public void setMatrix(DoubleMatrix matrix, MasterAggregatorUsage master) {
+    int numRows = matrix.getNumRows();
+    for (int i = 0; i < numRows; ++i) {
+      master.setAggregatedValue(getRowAggregatorName(i), matrix.getRow(i));
+    }
+  }
+
+  /**
+   * Read the aggregated values of the matrix.
+   *
+   * @param numRows the number of rows
+   * @param aggUser the master or worker
+   * @return the double matrix
+   */
+  public DoubleMatrix getMatrix(int numRows, AggregatorUsage aggUser) {
+    DoubleMatrix matrix = new DoubleMatrix(numRows);
+    for (int i = 0; i < numRows; ++i) {
+      DoubleVector vec = aggUser.getAggregatedValue(getRowAggregatorName(i));
+      matrix.setRow(i, vec);
+    }
+    return matrix;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVector.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVector.java
new file mode 100644
index 0000000..288be93
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVector.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import it.unimi.dsi.fastutil.ints.Int2DoubleOpenHashMap;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The double vector holds the values of a particular row.
+ */
+public class DoubleVector implements Writable {
+  /**
+   * The entries of the vector are (key, value) pairs of the form (row, value)
+   */
+  private Int2DoubleOpenHashMap entries = null;
+
+  /**
+   * Create a new vector with default size.
+   */
+  public DoubleVector() {
+    initialize(Int2DoubleOpenHashMap.DEFAULT_INITIAL_SIZE);
+  }
+
+  /**
+   * Create a new vector with given size.
+   *
+   * @param size the size of the vector
+   */
+  public DoubleVector(int size) {
+    initialize(size);
+  }
+
+  /**
+   * Initialize the values of the vector. The default value is 0.0
+   *
+   * @param size the size of the vector
+   */
+  private void initialize(int size) {
+    entries = new Int2DoubleOpenHashMap(size);
+    entries.defaultReturnValue(0.0f);
+  }
+
+  /**
+   * Get a particular entry of the vector.
+   *
+   * @param i the entry
+   * @return the value of the entry.
+   */
+  double get(int i) {
+    return entries.get(i);
+  }
+
+  /**
+   * Set the given value to the entry specified.
+   *
+   * @param i the entry
+   * @param value the value to set to the entry
+   */
+  void set(int i, double value) {
+    entries.put(i, value);
+  }
+
+  /**
+   * Clear the contents of the vector.
+   */
+  void clear() {
+    entries.clear();
+  }
+
+  /**
+   * Add the vector specified. This is a vector addition that does an
+   * element-by-element addition.
+   *
+   * @param other the vector to add.
+   */
+  void add(DoubleVector other) {
+    for (Entry<Integer, Double> entry : other.entries.entrySet()) {
+      entries.addTo(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(entries.size());
+    for (Entry<Integer, Double> entry : entries.entrySet()) {
+      out.writeInt(entry.getKey());
+      out.writeDouble(entry.getValue());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int size = in.readInt();
+    initialize(size);
+    for (int i = 0; i < size; ++i) {
+      int row = in.readInt();
+      double value = in.readDouble();
+      entries.put(row, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVectorSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVectorSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVectorSumAggregator.java
new file mode 100644
index 0000000..3318554
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVectorSumAggregator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.BasicAggregator;
+
+/**
+ * The double vector aggregator is used to aggregate double vectors.
+ */
+public class DoubleVectorSumAggregator extends BasicAggregator<DoubleVector> {
+
+  @Override
+  public DoubleVector createInitialValue() {
+    return new DoubleVector();
+  }
+
+  @Override
+  public void aggregate(DoubleVector vector) {
+    getAggregatedValue().add(vector);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrix.java
new file mode 100644
index 0000000..67bad5c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrix.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+/**
+ * A float matrix holds the values of the entries in float vectors. It keeps one
+ * float aggregator per matrix row.
+ */
+public class FloatMatrix {
+  /** The number of rows in the matrix */
+  private int numRows;
+  /** The rows of the matrix */
+  private Int2ObjectOpenHashMap<FloatVector> rows;
+
+  /**
+   * Create a new matrix with the given number of rows.
+   *
+   * @param numRows the number of rows.
+   */
+  public FloatMatrix(int numRows) {
+    this.numRows = numRows;
+    rows = new Int2ObjectOpenHashMap<FloatVector>(numRows);
+    rows.defaultReturnValue(null);
+  }
+
+  /**
+   * Create a empty matrix with all values set to 0.0
+   */
+  public void initialize() {
+    rows.clear();
+    for (int i = 0; i < numRows; ++i) {
+      setRow(i, new FloatVector());
+    }
+  }
+
+  /**
+   * Get the number of rows in the matrix.
+   *
+   * @return the number of rows.
+   */
+  public int getNumRows() {
+    return numRows;
+  }
+
+  /**
+   * Get a specific entry of the matrix.
+   *
+   * @param i the row
+   * @param j the column
+   * @return the value of the entry
+   */
+  public float get(int i, int j) {
+    return rows.get(i).get(j);
+  }
+
+  /**
+   * Set a specific entry of the matrix.
+   *
+   * @param i the row
+   * @param j the column
+   * @param v the value of the entry
+   */
+  public void set(int i, int j, float v) {
+    rows.get(i).set(j, v);
+  }
+
+  /**
+   * Get a specific row of the matrix.
+   *
+   * @param i the row number
+   * @return the row of the matrix
+   */
+  FloatVector getRow(int i) {
+    return rows.get(i);
+  }
+
+  /**
+   * Set the float vector as the row specified.
+   *
+   * @param i the row
+   * @param vec the vector to set as the row
+   */
+  void setRow(int i, FloatVector vec) {
+    rows.put(i, vec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrixSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrixSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrixSumAggregator.java
new file mode 100644
index 0000000..54406ed
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrixSumAggregator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.AggregatorUsage;
+import org.apache.giraph.master.MasterAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+
+/**
+ * The float matrix aggregator is used to register and aggregate float matrices.
+ */
+public class FloatMatrixSumAggregator extends MatrixSumAggregator {
+  /** sparse vector with single entry */
+  private FloatVector singletonVector = new FloatVector();
+
+  /**
+   * Create a new matrix aggregator with the given prefix name for the vector
+   * aggregators.
+   *
+   * @param name the prefix for the row vector aggregators
+   */
+  public FloatMatrixSumAggregator(String name) {
+    super(name);
+  }
+
+  /**
+   * Register the float vector aggregators, one for each row of the matrix.
+   *
+   * @param numRows the number of rows
+   * @param master the master to register the aggregators
+   */
+  public void register(int numRows, MasterAggregatorUsage master)
+    throws InstantiationException, IllegalAccessException {
+    for (int i = 0; i < numRows; ++i) {
+      master.registerAggregator(getRowAggregatorName(i),
+          FloatVectorSumAggregator.class);
+    }
+  }
+
+  /**
+   * Add the given value to the entry specified.
+   *
+   * @param i the row
+   * @param j the column
+   * @param v the value
+   * @param worker the worker to aggregate
+   */
+  public void aggregate(int i, int j, float v, WorkerAggregatorUsage worker) {
+    singletonVector.clear();
+    singletonVector.set(j, v);
+    worker.aggregate(getRowAggregatorName(i), singletonVector);
+  }
+
+  /**
+   * Set the values of the matrix to the master specified. This is typically
+   * used in the master, to build an external FloatMatrix and only set it at
+   * the end.
+   *
+   * @param matrix the matrix to set the values
+   * @param master the master
+   */
+  public void setMatrix(FloatMatrix matrix, MasterAggregatorUsage master) {
+    int numRows = matrix.getNumRows();
+    for (int i = 0; i < numRows; ++i) {
+      master.setAggregatedValue(getRowAggregatorName(i), matrix.getRow(i));
+    }
+  }
+
+  /**
+   * Read the aggregated values of the matrix.
+   *
+   * @param numRows the number of rows
+   * @param aggUser the master or worker
+   * @return the float matrix
+   */
+  public FloatMatrix getMatrix(int numRows, AggregatorUsage aggUser) {
+    FloatMatrix matrix = new FloatMatrix(numRows);
+    for (int i = 0; i < numRows; ++i) {
+      FloatVector vec = aggUser.getAggregatedValue(getRowAggregatorName(i));
+      matrix.setRow(i, vec);
+    }
+    return matrix;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVector.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVector.java
new file mode 100644
index 0000000..6efe81e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVector.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import it.unimi.dsi.fastutil.ints.Int2FloatOpenHashMap;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The float vector holds the values of a particular row.
+ */
+public class FloatVector implements Writable {
+  /**
+   * The entries of the vector are (key, value) pairs of the form (row, value)
+   */
+  private Int2FloatOpenHashMap entries = null;
+
+  /**
+   * Create a new vector with default size.
+   */
+  public FloatVector() {
+    initialize(Int2FloatOpenHashMap.DEFAULT_INITIAL_SIZE);
+  }
+
+  /**
+   * Create a new vector with given size.
+   *
+   * @param size the size of the vector
+   */
+  public FloatVector(int size) {
+    initialize(size);
+  }
+
+  /**
+   * Initialize the values of the vector. The default value is 0.0
+   *
+   * @param size the size of the vector
+   */
+  private void initialize(int size) {
+    entries = new Int2FloatOpenHashMap(size);
+    entries.defaultReturnValue(0.0f);
+  }
+
+  /**
+   * Get a particular entry of the vector.
+   *
+   * @param i the entry
+   * @return the value of the entry.
+   */
+  float get(int i) {
+    return entries.get(i);
+  }
+
+  /**
+   * Set the given value to the entry specified.
+   *
+   * @param i the entry
+   * @param value the value to set to the entry
+   */
+  void set(int i, float value) {
+    entries.put(i, value);
+  }
+
+  /**
+   * Clear the contents of the vector.
+   */
+  void clear() {
+    entries.clear();
+  }
+
+  /**
+   * Add the vector specified. This is a vector addition that does an
+   * element-by-element addition.
+   *
+   * @param other the vector to add.
+   */
+  void add(FloatVector other) {
+    for (Entry<Integer, Float> entry : other.entries.entrySet()) {
+      entries.addTo(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(entries.size());
+    for (Entry<Integer, Float> entry : entries.entrySet()) {
+      out.writeInt(entry.getKey());
+      out.writeFloat(entry.getValue());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int size = in.readInt();
+    initialize(size);
+    for (int i = 0; i < size; ++i) {
+      int row = in.readInt();
+      float value = in.readFloat();
+      entries.put(row, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVectorSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVectorSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVectorSumAggregator.java
new file mode 100644
index 0000000..b152395
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVectorSumAggregator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.BasicAggregator;
+
+/**
+ * The float vector aggregator is used to aggregate float vectors.
+ */
+public class FloatVectorSumAggregator extends BasicAggregator<FloatVector> {
+
+  @Override
+  public FloatVector createInitialValue() {
+    return new FloatVector();
+  }
+
+  @Override
+  public void aggregate(FloatVector vector) {
+    getAggregatedValue().add(vector);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrix.java
new file mode 100644
index 0000000..624c793
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrix.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+/**
+ * A int matrix holds the values of the entries in int vectors. It keeps one
+ * int aggregator per matrix row.
+ */
+public class IntMatrix {
+  /** The number of rows in the matrix */
+  private int numRows;
+  /** The rows of the matrix */
+  private Int2ObjectOpenHashMap<IntVector> rows;
+
+  /**
+   * Create a new matrix with the given number of rows.
+   *
+   * @param numRows the number of rows.
+   */
+  public IntMatrix(int numRows) {
+    this.numRows = numRows;
+    rows = new Int2ObjectOpenHashMap<IntVector>(numRows);
+    rows.defaultReturnValue(null);
+  }
+
+  /**
+   * Create a empty matrix with all values set to 0.0
+   */
+  public void initialize() {
+    rows.clear();
+    for (int i = 0; i < numRows; ++i) {
+      setRow(i, new IntVector());
+    }
+  }
+
+  /**
+   * Get the number of rows in the matrix.
+   *
+   * @return the number of rows.
+   */
+  public int getNumRows() {
+    return numRows;
+  }
+
+  /**
+   * Get a specific entry of the matrix.
+   *
+   * @param i the row
+   * @param j the column
+   * @return the value of the entry
+   */
+  public int get(int i, int j) {
+    return rows.get(i).get(j);
+  }
+
+  /**
+   * Set a specific entry of the matrix.
+   *
+   * @param i the row
+   * @param j the column
+   * @param v the value of the entry
+   */
+  public void set(int i, int j, int v) {
+    rows.get(i).set(j, v);
+  }
+
+  /**
+   * Get a specific row of the matrix.
+   *
+   * @param i the row number
+   * @return the row of the matrix
+   */
+  IntVector getRow(int i) {
+    return rows.get(i);
+  }
+
+  /**
+   * Set the int vector as the row specified.
+   *
+   * @param i the row
+   * @param vec the vector to set as the row
+   */
+  void setRow(int i, IntVector vec) {
+    rows.put(i, vec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrixSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrixSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrixSumAggregator.java
new file mode 100644
index 0000000..b7afa60
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrixSumAggregator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.AggregatorUsage;
+import org.apache.giraph.master.MasterAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+
+/**
+ * The int matrix aggregator is used to register and aggregate int matrices.
+ */
+public class IntMatrixSumAggregator extends MatrixSumAggregator {
+  /** sparse vector with single entry */
+  private IntVector singletonVector = new IntVector();
+
+  /**
+   * Create a new matrix aggregator with the given prefix name for the vector
+   * aggregators.
+   *
+   * @param name the prefix for the row vector aggregators
+   */
+  public IntMatrixSumAggregator(String name) {
+    super(name);
+  }
+
+  /**
+   * Register the int vector aggregators, one for each row of the matrix.
+   *
+   * @param numRows the number of rows
+   * @param master the master to register the aggregators
+   */
+  public void register(int numRows, MasterAggregatorUsage master)
+    throws InstantiationException, IllegalAccessException {
+    for (int i = 0; i < numRows; ++i) {
+      master.registerAggregator(getRowAggregatorName(i),
+          IntVectorSumAggregator.class);
+    }
+  }
+
+  /**
+   * Add the given value to the entry specified.
+   *
+   * @param i the row
+   * @param j the column
+   * @param v the value
+   * @param worker the worker to aggregate
+   */
+  public void aggregate(int i, int j, int v, WorkerAggregatorUsage worker) {
+    singletonVector.clear();
+    singletonVector.set(j, v);
+    worker.aggregate(getRowAggregatorName(i), singletonVector);
+  }
+
+  /**
+   * Set the values of the matrix to the master specified. This is typically
+   * used in the master, to build an external IntMatrix and only set it at
+   * the end.
+   *
+   * @param matrix the matrix to set the values
+   * @param master the master
+   */
+  public void setMatrix(IntMatrix matrix, MasterAggregatorUsage master) {
+    int numRows = matrix.getNumRows();
+    for (int i = 0; i < numRows; ++i) {
+      master.setAggregatedValue(getRowAggregatorName(i), matrix.getRow(i));
+    }
+  }
+
+  /**
+   * Read the aggregated values of the matrix.
+   *
+   * @param numRows the number of rows
+   * @param aggUser the master or worker
+   * @return the int matrix
+   */
+  public IntMatrix getMatrix(int numRows, AggregatorUsage aggUser) {
+    IntMatrix matrix = new IntMatrix(numRows);
+    for (int i = 0; i < numRows; ++i) {
+      IntVector vec = aggUser.getAggregatedValue(getRowAggregatorName(i));
+      matrix.setRow(i, vec);
+    }
+    return matrix;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVector.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVector.java
new file mode 100644
index 0000000..e5bb400
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVector.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The int vector holds the values of a particular row.
+ */
+public class IntVector implements Writable {
+  /**
+   * The entries of the vector are (key, value) pairs of the form (row, value)
+   */
+  private Int2IntOpenHashMap entries = null;
+
+  /**
+   * Create a new vector with default size.
+   */
+  public IntVector() {
+    initialize(Int2IntOpenHashMap.DEFAULT_INITIAL_SIZE);
+  }
+
+  /**
+   * Create a new vector with given size.
+   *
+   * @param size the size of the vector
+   */
+  public IntVector(int size) {
+    initialize(size);
+  }
+
+  /**
+   * Initialize the values of the vector. The default value is 0.0
+   *
+   * @param size the size of the vector
+   */
+  private void initialize(int size) {
+    entries = new Int2IntOpenHashMap(size);
+    entries.defaultReturnValue(0);
+  }
+
+  /**
+   * Get a particular entry of the vector.
+   *
+   * @param i the entry
+   * @return the value of the entry.
+   */
+  int get(int i) {
+    return entries.get(i);
+  }
+
+  /**
+   * Set the given value to the entry specified.
+   *
+   * @param i the entry
+   * @param value the value to set to the entry
+   */
+  void set(int i, int value) {
+    entries.put(i, value);
+  }
+
+  /**
+   * Clear the contents of the vector.
+   */
+  void clear() {
+    entries.clear();
+  }
+
+  /**
+   * Add the vector specified. This is a vector addition that does an
+   * element-by-element addition.
+   *
+   * @param other the vector to add.
+   */
+  void add(IntVector other) {
+    for (Entry<Integer, Integer> entry : other.entries.entrySet()) {
+      entries.addTo(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(entries.size());
+    for (Entry<Integer, Integer> entry : entries.entrySet()) {
+      out.writeInt(entry.getKey());
+      out.writeInt(entry.getValue());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int size = in.readInt();
+    initialize(size);
+    for (int i = 0; i < size; ++i) {
+      int row = in.readInt();
+      int value = in.readInt();
+      entries.put(row, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVectorSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVectorSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVectorSumAggregator.java
new file mode 100644
index 0000000..b588331
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVectorSumAggregator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.BasicAggregator;
+
+/**
+ * The float vector aggregator is used to aggregate float vectors.
+ */
+public class IntVectorSumAggregator extends BasicAggregator<IntVector> {
+
+  @Override
+  public IntVector createInitialValue() {
+    return new IntVector();
+  }
+
+  @Override
+  public void aggregate(IntVector vector) {
+    getAggregatedValue().add(vector);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrix.java
new file mode 100644
index 0000000..dbc3ecb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrix.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+/**
+ * A long matrix holds the values of the entries in long vectors. It keeps one
+ * long aggregator per matrix row.
+ */
+public class LongMatrix {
+  /** The number of rows in the matrix */
+  private int numRows;
+  /** The rows of the matrix */
+  private Int2ObjectOpenHashMap<LongVector> rows;
+
+  /**
+   * Create a new matrix with the given number of rows.
+   *
+   * @param numRows the number of rows.
+   */
+  public LongMatrix(int numRows) {
+    this.numRows = numRows;
+    rows = new Int2ObjectOpenHashMap<LongVector>(numRows);
+    rows.defaultReturnValue(null);
+  }
+
+  /**
+   * Create a empty matrix with all values set to 0.0
+   */
+  public void initialize() {
+    rows.clear();
+    for (int i = 0; i < numRows; ++i) {
+      setRow(i, new LongVector());
+    }
+  }
+
+  /**
+   * Get the number of rows in the matrix.
+   *
+   * @return the number of rows.
+   */
+  public int getNumRows() {
+    return numRows;
+  }
+
+  /**
+   * Get a specific entry of the matrix.
+   *
+   * @param i the row
+   * @param j the column
+   * @return the value of the entry
+   */
+  public long get(int i, int j) {
+    return rows.get(i).get(j);
+  }
+
+  /**
+   * Set a specific entry of the matrix.
+   *
+   * @param i the row
+   * @param j the column
+   * @param v the value of the entry
+   */
+  public void set(int i, int j, long v) {
+    rows.get(i).set(j, v);
+  }
+
+  /**
+   * Get a specific row of the matrix.
+   *
+   * @param i the row number
+   * @return the row of the matrix
+   */
+  LongVector getRow(int i) {
+    return rows.get(i);
+  }
+
+  /**
+   * Set the long vector as the row specified.
+   *
+   * @param i the row
+   * @param vec the vector to set as the row
+   */
+  void setRow(int i, LongVector vec) {
+    rows.put(i, vec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrixSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrixSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrixSumAggregator.java
new file mode 100644
index 0000000..a7dc186
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrixSumAggregator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.AggregatorUsage;
+import org.apache.giraph.master.MasterAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+
+/**
+ * The long matrix aggregator is used to register and aggregate long matrices.
+ */
+public class LongMatrixSumAggregator extends MatrixSumAggregator {
+  /** sparse vector with single entry */
+  private LongVector singletonVector = new LongVector();
+
+  /**
+   * Create a new matrix aggregator with the given prefix name for the vector
+   * aggregators.
+   *
+   * @param name the prefix for the row vector aggregators
+   */
+  public LongMatrixSumAggregator(String name) {
+    super(name);
+  }
+
+  /**
+   * Register the long vector aggregators, one for each row of the matrix.
+   *
+   * @param numRows the number of rows
+   * @param master the master to register the aggregators
+   */
+  public void register(int numRows, MasterAggregatorUsage master)
+    throws InstantiationException, IllegalAccessException {
+    for (int i = 0; i < numRows; ++i) {
+      master.registerAggregator(getRowAggregatorName(i),
+          LongVectorSumAggregator.class);
+    }
+  }
+
+  /**
+   * Add the given value to the entry specified.
+   *
+   * @param i the row
+   * @param j the column
+   * @param v the value
+   * @param worker the worker to aggregate
+   */
+  public void aggregate(int i, int j, long v, WorkerAggregatorUsage worker) {
+    singletonVector.clear();
+    singletonVector.set(j, v);
+    worker.aggregate(getRowAggregatorName(i), singletonVector);
+  }
+
+  /**
+   * Set the values of the matrix to the master specified. This is typically
+   * used in the master, to build an external LongMatrix and only set it at
+   * the end.
+   *
+   * @param matrix the matrix to set the values
+   * @param master the master
+   */
+  public void setMatrix(LongMatrix matrix, MasterAggregatorUsage master) {
+    int numRows = matrix.getNumRows();
+    for (int i = 0; i < numRows; ++i) {
+      master.setAggregatedValue(getRowAggregatorName(i), matrix.getRow(i));
+    }
+  }
+
+  /**
+   * Read the aggregated values of the matrix.
+   *
+   * @param numRows the number of rows
+   * @param aggUser the master or worker
+   * @return the long matrix
+   */
+  public LongMatrix getMatrix(int numRows, AggregatorUsage aggUser) {
+    LongMatrix matrix = new LongMatrix(numRows);
+    for (int i = 0; i < numRows; ++i) {
+      LongVector vec = aggUser.getAggregatedValue(getRowAggregatorName(i));
+      matrix.setRow(i, vec);
+    }
+    return matrix;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVector.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVector.java
new file mode 100644
index 0000000..6781b43
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVector.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The long vector holds the values of a particular row.
+ */
+public class LongVector implements Writable {
+  /**
+   * The entries of the vector are (key, value) pairs of the form (row, value)
+   */
+  private Int2LongOpenHashMap entries = null;
+
+  /**
+   * Create a new vector with default size.
+   */
+  public LongVector() {
+    initialize(Int2LongOpenHashMap.DEFAULT_INITIAL_SIZE);
+  }
+
+  /**
+   * Create a new vector with given size.
+   *
+   * @param size the size of the vector
+   */
+  public LongVector(int size) {
+    initialize(size);
+  }
+
+  /**
+   * Initialize the values of the vector. The default value is 0.0
+   *
+   * @param size the size of the vector
+   */
+  private void initialize(int size) {
+    entries = new Int2LongOpenHashMap(size);
+    entries.defaultReturnValue(0L);
+  }
+
+  /**
+   * Get a particular entry of the vector.
+   *
+   * @param i the entry
+   * @return the value of the entry.
+   */
+  long get(int i) {
+    return entries.get(i);
+  }
+
+  /**
+   * Set the given value to the entry specified.
+   *
+   * @param i the entry
+   * @param value the value to set to the entry
+   */
+  void set(int i, long value) {
+    entries.put(i, value);
+  }
+
+  /**
+   * Clear the contents of the vector.
+   */
+  void clear() {
+    entries.clear();
+  }
+
+  /**
+   * Add the vector specified. This is a vector addition that does an
+   * element-by-element addition.
+   *
+   * @param other the vector to add.
+   */
+  void add(LongVector other) {
+    for (Entry<Integer, Long> kv : other.entries.entrySet()) {
+      entries.addTo(kv.getKey(), kv.getValue());
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(entries.size());
+    for (Entry<Integer, Long> kv : entries.entrySet()) {
+      out.writeInt(kv.getKey());
+      out.writeLong(kv.getValue());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int size = in.readInt();
+    initialize(size);
+    for (int i = 0; i < size; ++i) {
+      int row = in.readInt();
+      long value = in.readLong();
+      entries.put(row, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVectorSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVectorSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVectorSumAggregator.java
new file mode 100644
index 0000000..ed35e15
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVectorSumAggregator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.BasicAggregator;
+
+/**
+ * The long vector aggregator is used to aggregate long vectors.
+ */
+public class LongVectorSumAggregator extends BasicAggregator<LongVector> {
+
+  @Override
+  public LongVector createInitialValue() {
+    return new LongVector();
+  }
+
+  @Override
+  public void aggregate(LongVector vector) {
+    getAggregatedValue().add(vector);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/MatrixSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/MatrixSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/MatrixSumAggregator.java
new file mode 100644
index 0000000..3864472
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/MatrixSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+/**
+ * The abstract matrix aggregator contains the prefix name of the vector
+ * aggregators the have the values of the rows.
+ */
+public abstract class MatrixSumAggregator {
+  /**
+   * The prefix name of the double vector aggregators. The aggregator names are
+   * created as (name0, name1, ...).
+   */
+  private String name;
+
+  /**
+   * Create a new matrix aggregator with the given prefix name for the vector
+   * aggregators.
+   *
+   * @param name the prefix for the row vector aggregators
+   */
+  public MatrixSumAggregator(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Get the name of the aggreagator of the row with the index specified.
+   *
+   * @param i the row of the matrix
+   * @return the name of the aggregator
+   */
+  protected String getRowAggregatorName(int i) {
+    return name + i;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/package-info.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/package-info.java
new file mode 100644
index 0000000..4297db7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of matrix aggregator.
+ */
+package org.apache.giraph.aggregators.matrix;

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestDoubleMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestDoubleMatrix.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestDoubleMatrix.java
new file mode 100644
index 0000000..d67eda1
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestDoubleMatrix.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.junit.Test;
+
+public class TestDoubleMatrix {
+  private static double E = 0.0001f;
+
+  @Test
+  public void testVectorAdd() {
+    // The default value should be 0
+    DoubleVector vec1 = new DoubleVector();
+    assertEquals(0.0, vec1.get(0), E);
+
+    // Basic get/set
+    vec1.set(0, 0.1);
+    vec1.set(10, 1.4);
+    assertEquals(0.1, vec1.get(0), E);
+    assertEquals(0.0, vec1.get(5), E);
+    assertEquals(1.4, vec1.get(10), E);
+
+    // Add another vector
+    DoubleVector vec2 = new DoubleVector();
+    vec2.set(0, 0.5);
+    vec2.set(5, 1.7);
+
+    vec1.add(vec2);
+    assertEquals(0.6, vec1.get(0), E);
+    assertEquals(1.7, vec1.get(5), E);
+    assertEquals(1.4, vec1.get(10), E);
+    assertEquals(0.0, vec1.get(15), E);
+  }
+
+  @Test
+  public void testVectorSerialize() throws Exception {
+    int size = 100;
+
+    // Serialize from
+    DoubleVector from = new DoubleVector(size);
+    from.set(0, 10.0);
+    from.set(10, 5.0);
+    from.set(12, 1.0);
+    byte[] data = WritableUtils.writeToByteArray(from);
+
+    // De-serialize to
+    DoubleVector to = new DoubleVector();
+    WritableUtils.readFieldsFromByteArray(data, to);
+
+    // The vectors should be equal
+    for (int i = 0; i < size; ++i) {
+      assertEquals(from.get(i), to.get(i), E);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestFloatMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestFloatMatrix.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestFloatMatrix.java
new file mode 100644
index 0000000..d0f9bb0
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestFloatMatrix.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.junit.Test;
+
+public class TestFloatMatrix {
+  private static float E = 0.0001f;
+
+  @Test
+  public void testVectorAdd() {
+    // The default value should be 0
+    FloatVector vec1 = new FloatVector();
+    assertEquals(0.0, vec1.get(0), E);
+
+    // Basic get/set
+    vec1.set(0, 0.1f);
+    vec1.set(10, 1.4f);
+    assertEquals(0.1, vec1.get(0), E);
+    assertEquals(0.0, vec1.get(5), E);
+    assertEquals(1.4, vec1.get(10), E);
+
+    // Add another vector
+    FloatVector vec2 = new FloatVector();
+    vec2.set(0, 0.5f);
+    vec2.set(5, 1.7f);
+
+    vec1.add(vec2);
+    assertEquals(0.6, vec1.get(0), E);
+    assertEquals(1.7, vec1.get(5), E);
+    assertEquals(1.4, vec1.get(10), E);
+    assertEquals(0.0, vec1.get(15), E);
+  }
+
+  @Test
+  public void testVectorSerialize() throws Exception {
+    int size = 100;
+
+    // Serialize from
+    FloatVector from = new FloatVector(size);
+    from.set(0, 10.0f);
+    from.set(10, 5.0f);
+    from.set(12, 1.0f);
+    byte[] data = WritableUtils.writeToByteArray(from);
+
+    // De-serialize to
+    FloatVector to = new FloatVector();
+    WritableUtils.readFieldsFromByteArray(data, to);
+
+    // The vectors should be equal
+    for (int i = 0; i < size; ++i) {
+      assertEquals(from.get(i), to.get(i), E);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestIntMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestIntMatrix.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestIntMatrix.java
new file mode 100644
index 0000000..e8d3561
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestIntMatrix.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.junit.Test;
+
+public class TestIntMatrix {
+
+  @Test
+  public void testVectorAdd() {
+    // The default value should be 0
+    IntVector vec1 = new IntVector();
+    assertEquals(0, vec1.get(0));
+
+    // Basic get/set
+    vec1.set(0, 1);
+    vec1.set(10, 14);
+    assertEquals(1, vec1.get(0));
+    assertEquals(0, vec1.get(5));
+    assertEquals(14, vec1.get(10));
+
+    // Add another vector
+    IntVector vec2 = new IntVector();
+    vec2.set(0, 5);
+    vec2.set(5, 17);
+
+    vec1.add(vec2);
+    assertEquals(6, vec1.get(0));
+    assertEquals(17, vec1.get(5));
+    assertEquals(14, vec1.get(10));
+    assertEquals(0, vec1.get(15));
+  }
+
+  @Test
+  public void testVectorSerialize() throws Exception {
+    int size = 100;
+
+    // Serialize from
+    IntVector from = new IntVector(size);
+    from.set(0, 10);
+    from.set(10, 5);
+    from.set(12, 1);
+    byte[] data = WritableUtils.writeToByteArray(from);
+
+    // De-serialize to
+    IntVector to = new IntVector();
+    WritableUtils.readFieldsFromByteArray(data, to);
+
+    // The vectors should be equal
+    for (int i = 0; i < size; ++i) {
+      assertEquals(from.get(i), to.get(i));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestLongMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestLongMatrix.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestLongMatrix.java
new file mode 100644
index 0000000..a0a7000
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestLongMatrix.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.junit.Test;
+
+public class TestLongMatrix {
+
+  @Test
+  public void testVectorAdd() {
+    // The default value should be 0
+    LongVector vec1 = new LongVector();
+    assertEquals(0, vec1.get(0));
+
+    // Basic get/set
+    vec1.set(0, 1);
+    vec1.set(10, 14);
+    assertEquals(1, vec1.get(0));
+    assertEquals(0, vec1.get(5));
+    assertEquals(14, vec1.get(10));
+
+    // Add another vector
+    LongVector vec2 = new LongVector();
+    vec2.set(0, 5);
+    vec2.set(5, 17);
+
+    vec1.add(vec2);
+    assertEquals(6, vec1.get(0));
+    assertEquals(17, vec1.get(5));
+    assertEquals(14, vec1.get(10));
+    assertEquals(0, vec1.get(15));
+  }
+
+  @Test
+  public void testVectorSerialize() throws Exception {
+    int size = 100;
+
+    // Serialize from
+    LongVector from = new LongVector(size);
+    from.set(0, 10);
+    from.set(10, 5);
+    from.set(12, 1);
+    byte[] data = WritableUtils.writeToByteArray(from);
+
+    // De-serialize to
+    LongVector to = new LongVector();
+    WritableUtils.readFieldsFromByteArray(data, to);
+
+    // The vectors should be equal
+    for (int i = 0; i < size; ++i) {
+      assertEquals(from.get(i), to.get(i));
+    }
+  }
+}


Mime
View raw message