incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r813233 [3/7] - in /incubator/hama/branches: ./ hama-0.19/ hama-0.19/bin/ hama-0.19/conf/ hama-0.19/lib/ hama-0.19/lib/findbugs/ hama-0.19/lib/findbugs/plugin/ hama-0.19/lib/jetty-ext/ hama-0.19/src/ hama-0.19/src/docs/ hama-0.19/src/docs/s...
Date Thu, 10 Sep 2009 05:32:59 GMT
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/AbstractMatrix.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/AbstractMatrix.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/AbstractMatrix.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,567 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionException;
+import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.IdentityTableReduce;
+import org.apache.hadoop.hbase.mapred.TableMap;
+import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.algebra.JacobiEigenValue;
+import org.apache.hama.algebra.MatrixNormMapRed;
+import org.apache.hama.algebra.TransposeMap;
+import org.apache.hama.algebra.TransposeReduce;
+import org.apache.hama.algebra.MatrixNormMapRed.MatrixFrobeniusNormCombiner;
+import org.apache.hama.algebra.MatrixNormMapRed.MatrixFrobeniusNormMapper;
+import org.apache.hama.algebra.MatrixNormMapRed.MatrixFrobeniusNormReducer;
+import org.apache.hama.algebra.MatrixNormMapRed.MatrixInfinityNormMapper;
+import org.apache.hama.algebra.MatrixNormMapRed.MatrixInfinityNormReducer;
+import org.apache.hama.algebra.MatrixNormMapRed.MatrixMaxValueNormMapper;
+import org.apache.hama.algebra.MatrixNormMapRed.MatrixMaxValueNormReducer;
+import org.apache.hama.algebra.MatrixNormMapRed.MatrixOneNormCombiner;
+import org.apache.hama.algebra.MatrixNormMapRed.MatrixOneNormMapper;
+import org.apache.hama.algebra.MatrixNormMapRed.MatrixOneNormReducer;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.util.BytesUtil;
+import org.apache.hama.util.JobManager;
+import org.apache.hama.util.RandomVariable;
+import org.apache.log4j.Logger;
+
+/**
+ * Methods of the matrix classes
+ */
+public abstract class AbstractMatrix implements Matrix {
+  static int tryPathLength = Constants.DEFAULT_PATH_LENGTH;
+  static final Logger LOG = Logger.getLogger(AbstractMatrix.class);
+
+  protected HamaConfiguration config;
+  protected HBaseAdmin admin;
+  // a matrix just need a table path to point to the table which stores matrix.
+  // let HamaAdmin manage Matrix Name space.
+  protected String matrixPath;
+  protected HTable table;
+  protected HTableDescriptor tableDesc;
+  protected HamaAdmin hamaAdmin;
+
+  protected boolean closed = true;
+
+  /**
+   * Sets the job configuration
+   * 
+   * @param conf configuration object
+   * @throws MasterNotRunningException
+   */
+  public void setConfiguration(HamaConfiguration conf)
+      throws MasterNotRunningException {
+    this.config = conf;
+    this.admin = new HBaseAdmin(config);
+
+    hamaAdmin = new HamaAdminImpl(conf, admin);
+  }
+
+  /**
+   * try to create a new matrix with a new random name. try times will be
+   * (Integer.MAX_VALUE - 4) * DEFAULT_TRY_TIMES;
+   * 
+   * @throws IOException
+   */
+  protected void tryToCreateTable(String table_prefix) throws IOException {
+    int tryTimes = Constants.DEFAULT_TRY_TIMES;
+    do {
+      matrixPath = table_prefix + "_"
+          + RandomVariable.randMatrixPath(tryPathLength);
+
+      if (!admin.tableExists(matrixPath)) { // no table 'matrixPath' in hbase.
+        tableDesc = new HTableDescriptor(matrixPath);
+        create();
+        return;
+      }
+
+      tryTimes--;
+      if (tryTimes <= 0) { // this loop has exhausted DEFAULT_TRY_TIMES.
+        tryPathLength++;
+        tryTimes = Constants.DEFAULT_TRY_TIMES;
+      }
+
+    } while (tryPathLength <= Constants.DEFAULT_MAXPATHLEN);
+    // exhaustes the try times.
+    // throw out an IOException to let the user know what happened.
+    throw new IOException("Try too many times to create a table in hbase.");
+  }
+
+  /**
+   * Create matrix space
+   */
+  protected void create() throws IOException {
+    // It should run only when table doesn't exist.
+    if (!admin.tableExists(matrixPath)) {
+      this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+          .toBytes(Constants.COLUMN), 3, CompressionType.NONE, false, false,
+          Integer.MAX_VALUE, HConstants.FOREVER, false));
+      this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE));
+      this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY));
+      // It's a temporary data.
+      this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+          .toBytes(Constants.BLOCK), 1, CompressionType.NONE, false, false,
+          Integer.MAX_VALUE, HConstants.FOREVER, false));
+      // the following families are used in JacobiEigenValue computation
+      this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+          .toBytes(JacobiEigenValue.EI), 1, CompressionType.NONE, false, false,
+          Integer.MAX_VALUE, HConstants.FOREVER, false));
+      this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+          .toBytes(JacobiEigenValue.EICOL), 10, CompressionType.NONE, false,
+          false, Integer.MAX_VALUE, HConstants.FOREVER, false));
+      this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+          .toBytes(JacobiEigenValue.EIVEC), 10, CompressionType.NONE, false,
+          false, Integer.MAX_VALUE, HConstants.FOREVER, false));
+
+      LOG.info("Initializing the matrix storage.");
+      this.admin.createTable(this.tableDesc);
+      LOG.info("Create Matrix " + matrixPath);
+
+      // connect to the table.
+      table = new HTable(config, matrixPath);
+      table.setAutoFlush(true);
+
+      // Record the matrix type in METADATA_TYPE
+      BatchUpdate update = new BatchUpdate(Constants.METADATA);
+      update.put(Constants.METADATA_TYPE, Bytes.toBytes(this.getClass()
+          .getSimpleName()));
+
+      table.commit(update);
+
+      // the new matrix's reference is 1.
+      setReference(1);
+    }
+  }
+
+  public HTable getHTable() {
+    return this.table;
+  }
+
+  protected double getNorm1() throws IOException {
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("norm1 MR job : " + this.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(1);
+
+    final FileSystem fs = FileSystem.get(jobConf);
+    Path outDir = new Path(new Path(getType() + "_TMP_norm1_dir_"
+        + System.currentTimeMillis()), "out");
+    if (fs.exists(outDir))
+      fs.delete(outDir, true);
+
+    MatrixNormMapRed.initJob(this.getPath(), outDir.toString(),
+        MatrixOneNormMapper.class, MatrixOneNormCombiner.class,
+        MatrixOneNormReducer.class, jobConf);
+
+    // update the out put dir of the job
+    outDir = FileOutputFormat.getOutputPath(jobConf);
+    JobManager.execute(jobConf);
+
+    // read outputs
+    Path inFile = new Path(outDir, "reduce-out");
+    IntWritable numInside = new IntWritable();
+    DoubleWritable max = new DoubleWritable();
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+    try {
+      reader.next(numInside, max);
+    } finally {
+      reader.close();
+    }
+
+    fs.delete(outDir.getParent(), true);
+    return max.get();
+  }
+
+  protected double getMaxvalue() throws IOException {
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("MaxValue Norm MR job : " + this.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(1);
+
+    final FileSystem fs = FileSystem.get(jobConf);
+    Path outDir = new Path(new Path(getType() + "_TMP_normMaxValue_dir_"
+        + System.currentTimeMillis()), "out");
+    if (fs.exists(outDir))
+      fs.delete(outDir, true);
+
+    MatrixNormMapRed.initJob(this.getPath(), outDir.toString(),
+        MatrixMaxValueNormMapper.class, MatrixMaxValueNormReducer.class,
+        MatrixMaxValueNormReducer.class, jobConf);
+
+    // update the out put dir of the job
+    outDir = FileOutputFormat.getOutputPath(jobConf);
+    JobManager.execute(jobConf);
+
+    // read outputs
+    Path inFile = new Path(outDir, "part-00000");
+    IntWritable numInside = new IntWritable();
+    DoubleWritable max = new DoubleWritable();
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+    try {
+      reader.next(numInside, max);
+    } finally {
+      reader.close();
+    }
+
+    fs.delete(outDir.getParent(), true);
+    return max.get();
+  }
+
+  protected double getInfinity() throws IOException {
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("Infinity Norm MR job : " + this.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(1);
+
+    final FileSystem fs = FileSystem.get(jobConf);
+    Path outDir = new Path(new Path(getType() + "_TMP_normInifity_dir_"
+        + System.currentTimeMillis()), "out");
+    if (fs.exists(outDir))
+      fs.delete(outDir, true);
+
+    MatrixNormMapRed.initJob(this.getPath(), outDir.toString(),
+        MatrixInfinityNormMapper.class, MatrixInfinityNormReducer.class,
+        MatrixInfinityNormReducer.class, jobConf);
+
+    // update the out put dir of the job
+    outDir = FileOutputFormat.getOutputPath(jobConf);
+
+    JobManager.execute(jobConf);
+
+    // read outputs
+    Path inFile = new Path(outDir, "part-00000");
+    IntWritable numInside = new IntWritable();
+    DoubleWritable max = new DoubleWritable();
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+    try {
+      reader.next(numInside, max);
+    } finally {
+      reader.close();
+    }
+
+    fs.delete(outDir.getParent(), true);
+    return max.get();
+  }
+
+  protected double getFrobenius() throws IOException {
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("Frobenius Norm MR job : " + this.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(1);
+
+    final FileSystem fs = FileSystem.get(jobConf);
+    Path outDir = new Path(new Path(getType() + "_TMP_normFrobenius_dir_"
+        + System.currentTimeMillis()), "out");
+    if (fs.exists(outDir))
+      fs.delete(outDir, true);
+
+    MatrixNormMapRed.initJob(this.getPath(), outDir.toString(),
+        MatrixFrobeniusNormMapper.class, MatrixFrobeniusNormCombiner.class,
+        MatrixFrobeniusNormReducer.class, jobConf);
+
+    // update the out put dir of the job
+    outDir = FileOutputFormat.getOutputPath(jobConf);
+
+    JobManager.execute(jobConf);
+
+    // read outputs
+    Path inFile = new Path(outDir, "part-00000");
+    IntWritable numInside = new IntWritable();
+    DoubleWritable sqrt = new DoubleWritable();
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+    try {
+      reader.next(numInside, sqrt);
+    } finally {
+      reader.close();
+    }
+
+    fs.delete(outDir.getParent(), true);
+    return sqrt.get();
+  }
+
+  /** {@inheritDoc} */
+  public int getRows() throws IOException {
+    Cell rows = null;
+    rows = table.get(Constants.METADATA, Constants.METADATA_ROWS);
+    return (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
+  }
+
+  /** {@inheritDoc} */
+  public int getColumns() throws IOException {
+    Cell columns = table.get(Constants.METADATA, Constants.METADATA_COLUMNS);
+    return BytesUtil.bytesToInt(columns.getValue());
+  }
+
+  /** {@inheritDoc} */
+  public String getRowLabel(int row) throws IOException {
+    Cell rows = null;
+    rows = table.get(BytesUtil.getRowIndex(row), Bytes
+        .toBytes(Constants.ATTRIBUTE + "string"));
+
+    return (rows != null) ? Bytes.toString(rows.getValue()) : null;
+  }
+
+  /** {@inheritDoc} */
+  public String getColumnLabel(int column) throws IOException {
+    Cell rows = null;
+    rows = table.get(Constants.CINDEX, (Constants.ATTRIBUTE + column));
+    return (rows != null) ? Bytes.toString(rows.getValue()) : null;
+  }
+
+  /** {@inheritDoc} */
+  public void setRowLabel(int row, String name) throws IOException {
+    VectorUpdate update = new VectorUpdate(row);
+    update.put(Constants.ATTRIBUTE + "string", name);
+    table.commit(update.getBatchUpdate());
+  }
+
+  /** {@inheritDoc} */
+  public void setDimension(int rows, int columns) throws IOException {
+    VectorUpdate update = new VectorUpdate(Constants.METADATA);
+    update.put(Constants.METADATA_ROWS, rows);
+    update.put(Constants.METADATA_COLUMNS, columns);
+
+    table.commit(update.getBatchUpdate());
+  }
+
+  /** {@inheritDoc} */
+  public void add(int i, int j, double value) throws IOException {
+    VectorUpdate update = new VectorUpdate(i);
+    update.put(j, value + this.get(i, j));
+    table.commit(update.getBatchUpdate());
+
+  }
+
+  /**
+   * Just full scan a table.
+   */
+  public static class TableReadMapper extends MapReduceBase implements
+      TableMap<ImmutableBytesWritable, BatchUpdate> {
+    private static List<Double> alpha = new ArrayList<Double>();
+
+    @SuppressWarnings("unchecked")
+    public void map(ImmutableBytesWritable key, RowResult value,
+        OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
+        @SuppressWarnings("unused")
+        Reporter reporter) throws IOException {
+
+      BatchUpdate update = new BatchUpdate(key.get());
+      for (Map.Entry<byte[], Cell> e : value.entrySet()) {
+        if (alpha.size() == 0) {
+          update.put(e.getKey(), e.getValue().getValue());
+        } else {
+          String column = new String(e.getKey());
+          if (column.startsWith(Constants.COLUMN)) {
+            double currValue = BytesUtil.bytesToDouble(e.getValue().getValue());
+            update.put(e.getKey(), (BytesUtil.doubleToBytes(currValue
+                * alpha.get(0))));
+          } else {
+            update.put(e.getKey(), e.getValue().getValue());
+          }
+        }
+      }
+      output.collect(key, update);
+    }
+
+    public static void setAlpha(double a) {
+      if (alpha.size() > 0)
+        alpha = new ArrayList<Double>();
+      alpha.add(a);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public Matrix set(Matrix B) throws IOException {
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("set MR job : " + this.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+    TableMapReduceUtil.initTableMapJob(B.getPath(), Constants.COLUMN + " "
+        + Constants.ATTRIBUTE + " " + Constants.ALIASEFAMILY + " "
+        + Constants.BLOCK, TableReadMapper.class, ImmutableBytesWritable.class,
+        BatchUpdate.class, jobConf);
+    TableMapReduceUtil.initTableReduceJob(this.getPath(),
+        IdentityTableReduce.class, jobConf);
+
+    JobManager.execute(jobConf);
+    return this;
+  }
+
+  /** {@inheritDoc} */
+  public Matrix set(double alpha, Matrix B) throws IOException {
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("set MR job : " + this.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+    TableReadMapper.setAlpha(alpha);
+    TableMapReduceUtil.initTableMapJob(B.getPath(), Constants.COLUMN + " "
+        + Constants.ATTRIBUTE + " " + Constants.ALIASEFAMILY + " "
+        + Constants.BLOCK, TableReadMapper.class, ImmutableBytesWritable.class,
+        BatchUpdate.class, jobConf);
+    TableMapReduceUtil.initTableReduceJob(this.getPath(),
+        IdentityTableReduce.class, jobConf);
+
+    JobManager.execute(jobConf);
+    return this;
+  }
+
+  /** {@inheritDoc} */
+  public void setColumnLabel(int column, String name) throws IOException {
+    VectorUpdate update = new VectorUpdate(Constants.CINDEX);
+    update.put(column, name);
+    table.commit(update.getBatchUpdate());
+  }
+
+  /** {@inheritDoc} */
+  public String getPath() {
+    return matrixPath;
+  }
+
+  protected void setReference(int reference) throws IOException {
+    BatchUpdate update = new BatchUpdate(Constants.METADATA);
+    update.put(Constants.METADATA_REFERENCE, Bytes.toBytes(reference));
+    table.commit(update);
+
+  }
+
+  protected int incrementAndGetRef() throws IOException {
+    int reference = 1;
+    Cell rows = null;
+    rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE);
+    if (rows != null) {
+      reference = Bytes.toInt(rows.getValue());
+      reference++;
+    }
+    setReference(reference);
+    return reference;
+  }
+
+  protected int decrementAndGetRef() throws IOException {
+    int reference = 0;
+    Cell rows = null;
+    rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE);
+    if (rows != null) {
+      reference = Bytes.toInt(rows.getValue());
+      if (reference > 0) // reference==0, we need not to decrement it.
+        reference--;
+    }
+    setReference(reference);
+    return reference;
+  }
+
+  protected boolean hasAliaseName() throws IOException {
+    Cell rows = null;
+    rows = table.get(Constants.METADATA, Constants.ALIASENAME);
+    return (rows != null) ? true : false;
+  }
+
+  public void close() throws IOException {
+    if (closed) // have been closed
+      return;
+    int reference = decrementAndGetRef();
+    if (reference <= 0) { // no reference again.
+      if (!hasAliaseName()) { // the table has not been aliased, we delete the
+        // table.
+        if (admin.isTableEnabled(matrixPath)) {
+          while (admin.isTableEnabled(matrixPath)) {
+            try {
+              admin.disableTable(matrixPath);
+            } catch (RegionException e) {
+              LOG.warn(e);
+            }
+          }
+
+          admin.deleteTable(matrixPath);
+        }
+      }
+    }
+    closed = true;
+  }
+
+  public Matrix transpose() throws IOException {
+    Matrix result;
+    if (this.getType().equals("SparseMatrix")) {
+      result = new SparseMatrix(config, this.getRows(), this.getColumns());
+    } else {
+      result = new DenseMatrix(config, this.getRows(), this.getColumns());
+    }
+
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("transpose MR job" + result.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+    TransposeMap.initJob(this.getPath(), TransposeMap.class, IntWritable.class,
+        MapWritable.class, jobConf);
+    TransposeReduce.initJob(result.getPath(), TransposeReduce.class, jobConf);
+
+    JobManager.execute(jobConf);
+    return result;
+  }
+
+  public boolean save(String aliasename) throws IOException {
+    // mark & update the aliase name in "alise:name" meta column.
+    // ! one matrix has only one aliasename now.
+    BatchUpdate update = new BatchUpdate(Constants.METADATA);
+    update.put(Constants.ALIASENAME, Bytes.toBytes(aliasename));
+    update.put(Constants.ATTRIBUTE + "type", Bytes.toBytes(this.getType()));
+    table.commit(update);
+
+    return hamaAdmin.save(this, aliasename);
+  }
+}

Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/AbstractVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/AbstractVector.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/AbstractVector.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/AbstractVector.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,98 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Methods of the vector classes
+ */
+public abstract class AbstractVector {
+  static final Logger LOG = Logger.getLogger(AbstractVector.class);
+  protected MapWritable entries;
+
+  public void initMap(RowResult row) {
+    this.entries = new MapWritable();
+    for (Map.Entry<byte[], Cell> f : row.entrySet()) {
+      this.entries.put(new IntWritable(BytesUtil.getColumnIndex(f.getKey())),
+          new DoubleEntry(f.getValue()));
+    }
+  }
+
+  /**
+   * Returns an Iterator.
+   * 
+   * @return iterator
+   */
+  public Iterator<Writable> iterator() {
+    return this.entries.values().iterator();
+  }
+
+  /**
+   * Returns a size of vector. If vector is sparse, returns the number of only
+   * non-zero elements.
+   * 
+   * @return a size of vector
+   */
+  public int size() {
+    int x = 0;
+    if (this.entries != null && this.entries.containsKey(new Text("row")))
+      x = 1;
+
+    return (this.entries != null) ? this.entries.size() - x : 0;
+  }
+
+  /**
+   * Returns the {@link org.apache.hadoop.io.MapWritable}
+   * 
+   * @return the entries of vector
+   */
+  public MapWritable getEntries() {
+    return this.entries;
+  }
+
+  /**
+   * Checks for conformant sizes
+   */
+  protected void checkComformantSize(Vector v2) {
+    if (this.size() != v2.size()) {
+      throw new IndexOutOfBoundsException("v1.size != v2.size (" + this.size()
+          + " != " + v2.size() + ")");
+    }
+  }
+
+  /**
+   * Clears the entries.
+   */
+  public void clear() {
+    this.entries = null;
+  }
+}

Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Constants.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Constants.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Constants.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,96 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Some constants used in the Hama
+ */
+public class Constants {
+
+  /** Meta-columnFamily to store the matrix-info */
+  public final static String METADATA = "metadata";
+
+  /** Column index & attributes */
+  public final static String CINDEX = "cIndex";
+
+  /** The attribute column family */
+  public final static String ATTRIBUTE = "attribute:";
+
+  /** The number of the matrix rows */
+  public final static String METADATA_ROWS = "attribute:rows";
+
+  /** The number of the matrix columns */
+  public final static String METADATA_COLUMNS = "attribute:columns";
+
+  /** The type of the matrix */
+  public final static String METADATA_TYPE = "attribute:type";
+  
+  /** The reference of the matrix */
+  /** (1) when we create a Matrix object, we set up a connection to hbase table,
+   *      the reference of the table will be incremented.
+   *  (2) when we close a Matrix object, we disconnect the hbase table, 
+   *      the reference of the table will be decremented.
+   *      i)  if the reference of the table is not zero:
+   *          we should not delete the table, because some other matrix object
+   *          connect to the table.
+   *      ii) if the reference of the table is zero:
+   *          we need to know if the matrix table is aliased.
+   *          1) if the matrix table is aliased, we should not delete the table.
+   *          2) if the matrix table is not aliased, we need to delete the table.
+   */
+  public final static String METADATA_REFERENCE = "attribute:reference";
+  
+  /** The aliase names column family */
+  public final static String ALIASEFAMILY = "aliase:";
+  
+  /** The aliase names of the matrix, sperated by "," */
+  public final static String ALIASENAME = "aliase:name";
+
+  /** Default columnFamily name */
+  public final static String COLUMN = "column:";
+
+  /** Temporary random matrices name prefix */
+  public final static String RANDOM = "rand";
+
+  /** Admin table name */
+  public final static String ADMINTABLE = "admin.table";
+
+  /** Matrix path columnFamily */
+  public static final String PATHCOLUMN = "path:";
+
+  /** Temporary Aliase name prefix in Hama Shell */
+  public static final String RANDOMALIASE = "_";
+  
+  /** default matrix's path length (tablename length) */
+  public static final int DEFAULT_PATH_LENGTH = 5;
+  
+  /** default matrix's max path length (tablename length) */
+  public static final int DEFAULT_MAXPATHLEN = 10000;
+  
+  /** default try times to generate a suitable tablename */
+  public static final int DEFAULT_TRY_TIMES = 10000000;
+  
+  /** block data column */
+  public static final String BLOCK = "block:";
+  
+  public static final Text ROWCOUNT= new Text("row");
+}

Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/DenseMatrix.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/DenseMatrix.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/DenseMatrix.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,930 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hama.algebra.BlockMultiplyMap;
+import org.apache.hama.algebra.BlockMultiplyReduce;
+import org.apache.hama.algebra.DenseMatrixVectorMultMap;
+import org.apache.hama.algebra.DenseMatrixVectorMultReduce;
+import org.apache.hama.algebra.JacobiEigenValue;
+import org.apache.hama.algebra.RowCyclicAdditionMap;
+import org.apache.hama.algebra.RowCyclicAdditionReduce;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.io.Pair;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.CollectBlocksMapper;
+import org.apache.hama.mapred.DummyMapper;
+import org.apache.hama.mapred.RandomMatrixMap;
+import org.apache.hama.mapred.RandomMatrixReduce;
+import org.apache.hama.mapred.VectorInputFormat;
+import org.apache.hama.util.BytesUtil;
+import org.apache.hama.util.JobManager;
+import org.apache.hama.util.RandomVariable;
+
+/**
+ * This class represents a dense matrix.
+ */
+public class DenseMatrix extends AbstractMatrix implements Matrix {
+  static private final String TABLE_PREFIX = DenseMatrix.class.getSimpleName();
+  static private final Path TMP_DIR = new Path(DenseMatrix.class
+      .getSimpleName()
+      + "_TMP_dir");
+
+  /**
+   * Construct a raw matrix. Just create a table in HBase.
+   * 
+   * @param conf configuration object
+   * @param m the number of rows.
+   * @param n the number of columns.
+   * @throws IOException throw the exception to let the user know what happend,
+   *                 if we didn't create the matrix successfully.
+   */
+  public DenseMatrix(HamaConfiguration conf, int m, int n) throws IOException {
+    setConfiguration(conf);
+
+    tryToCreateTable(TABLE_PREFIX);
+    closed = false;
+    this.setDimension(m, n);
+  }
+
+  /**
+   * Create/load a matrix aliased as 'matrixName'.
+   * 
+   * @param conf configuration object
+   * @param matrixName the name of the matrix
+   * @param force if force is true, a new matrix will be created no matter
+   *                'matrixName' has aliased to an existed matrix; otherwise,
+   *                just try to load an existed matrix alised 'matrixName'.
+   * @throws IOException
+   */
+  public DenseMatrix(HamaConfiguration conf, String matrixName, boolean force)
+      throws IOException {
+    setConfiguration(conf);
+    // if force is set to true:
+    // 1) if this matrixName has aliase to other matrix, we will remove
+    // the old aliase, create a new matrix table, and aliase to it.
+
+    // 2) if this matrixName has no aliase to other matrix, we will create
+    // a new matrix table, and alise to it.
+    //
+    // if force is set to false, we just try to load an existed matrix alised
+    // as 'matrixname'.
+
+    boolean existed = hamaAdmin.matrixExists(matrixName);
+
+    if (force) {
+      if (existed) {
+        // remove the old aliase
+        hamaAdmin.delete(matrixName);
+      }
+      // create a new matrix table.
+      tryToCreateTable(TABLE_PREFIX);
+      // save the new aliase relationship
+      save(matrixName);
+    } else {
+      if (existed) {
+        // try to get the actual path of the table
+        matrixPath = hamaAdmin.getPath(matrixName);
+        // load the matrix
+        table = new HTable(conf, matrixPath);
+        // increment the reference
+        incrementAndGetRef();
+      } else {
+        throw new IOException("Try to load non-existed matrix alised as "
+            + matrixName);
+      }
+    }
+
+    closed = false;
+  }
+
+  /**
+   * Load a matrix from an existed matrix table whose tablename is 'matrixpath' !!
+   * It is an internal used for map/reduce.
+   * 
+   * @param conf configuration object
+   * @param matrixpath
+   * @throws IOException
+   * @throws IOException
+   */
+  public DenseMatrix(HamaConfiguration conf, String matrixpath)
+      throws IOException {
+    setConfiguration(conf);
+    matrixPath = matrixpath;
+    // load the matrix
+    table = new HTable(conf, matrixPath);
+    // TODO: now we don't increment the reference of the table
+    // for it's an internal use for map/reduce.
+    // if we want to increment the reference of the table,
+    // we don't know where to call Matrix.close in Add & Mul map/reduce
+    // process to decrement the reference. It seems difficulty.
+  }
+
+  /**
+   * Create an m-by-n constant matrix.
+   * 
+   * @param conf configuration object
+   * @param m the number of rows.
+   * @param n the number of columns.
+   * @param s fill the matrix with this scalar value.
+   * @throws IOException throw the exception to let the user know what happend,
+   *                 if we didn't create the matrix successfully.
+   */
+  public DenseMatrix(HamaConfiguration conf, int m, int n, double s)
+      throws IOException {
+    setConfiguration(conf);
+
+    tryToCreateTable(TABLE_PREFIX);
+
+    closed = false;
+
+    for (int i = 0; i < m; i++) {
+      for (int j = 0; j < n; j++) {
+        set(i, j, s);
+      }
+    }
+
+    setDimension(m, n);
+  }
+
+  /**
+   * Generate matrix with random elements
+   * 
+   * @param conf configuration object
+   * @param m the number of rows.
+   * @param n the number of columns.
+   * @return an m-by-n matrix with uniformly distributed random elements.
+   * @throws IOException
+   */
+  public static DenseMatrix random(HamaConfiguration conf, int m, int n)
+      throws IOException {
+    DenseMatrix rand = new DenseMatrix(conf, m, n);
+    DenseVector vector = new DenseVector();
+    LOG.info("Create the " + m + " * " + n + " random matrix : "
+        + rand.getPath());
+
+    for (int i = 0; i < m; i++) {
+      vector.clear();
+      for (int j = 0; j < n; j++) {
+        vector.set(j, RandomVariable.rand());
+      }
+      rand.setRow(i, vector);
+    }
+
+    return rand;
+  }
+
+  /**
+   * Generate matrix with random elements using Map/Reduce
+   * 
+   * @param conf configuration object
+   * @param m the number of rows.
+   * @param n the number of columns.
+   * @return an m-by-n matrix with uniformly distributed random elements.
+   * @throws IOException
+   */
+  public static DenseMatrix random_mapred(HamaConfiguration conf, int m, int n)
+      throws IOException {
+    DenseMatrix rand = new DenseMatrix(conf, m, n);
+    LOG.info("Create the " + m + " * " + n + " random matrix : "
+        + rand.getPath());
+
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setJobName("random matrix MR job : " + rand.getPath());
+
+    jobConf.setNumMapTasks(conf.getNumMapTasks());
+    jobConf.setNumReduceTasks(conf.getNumReduceTasks());
+
+    final Path inDir = new Path(TMP_DIR, "in");
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    jobConf.setMapperClass(RandomMatrixMap.class);
+    jobConf.setMapOutputKeyClass(IntWritable.class);
+    jobConf.setMapOutputValueClass(MapWritable.class);
+
+    RandomMatrixReduce.initJob(rand.getPath(), RandomMatrixReduce.class,
+        jobConf);
+    jobConf.setSpeculativeExecution(false);
+    jobConf.setInt("matrix.column", n);
+    jobConf.set("matrix.type", TABLE_PREFIX);
+    jobConf.set("matrix.density", "100");
+
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    final FileSystem fs = FileSystem.get(jobConf);
+    int interval = m / conf.getNumMapTasks();
+
+    // generate an input file for each map task
+    for (int i = 0; i < conf.getNumMapTasks(); ++i) {
+      final Path file = new Path(inDir, "part" + i);
+      final IntWritable start = new IntWritable(i * interval);
+      IntWritable end = null;
+      if ((i + 1) != conf.getNumMapTasks()) {
+        end = new IntWritable(((i * interval) + interval) - 1);
+      } else {
+        end = new IntWritable(m - 1);
+      }
+      final SequenceFile.Writer writer = SequenceFile.createWriter(fs, jobConf,
+          file, IntWritable.class, IntWritable.class, CompressionType.NONE);
+      try {
+        writer.append(start, end);
+      } finally {
+        writer.close();
+      }
+      System.out.println("Wrote input for Map #" + i);
+    }
+
+    JobClient.runJob(jobConf);
+    fs.delete(TMP_DIR, true);
+    return rand;
+  }
+
+  /**
+   * Generate identity matrix
+   * 
+   * @param conf configuration object
+   * @param m the number of rows.
+   * @param n the number of columns.
+   * @return an m-by-n matrix with ones on the diagonal and zeros elsewhere.
+   * @throws IOException
+   */
+  public static DenseMatrix identity(HamaConfiguration conf, int m, int n)
+      throws IOException {
+    DenseMatrix identity = new DenseMatrix(conf, m, n);
+    LOG.info("Create the " + m + " * " + n + " identity matrix : "
+        + identity.getPath());
+
+    for (int i = 0; i < m; i++) {
+      DenseVector vector = new DenseVector();
+      for (int j = 0; j < n; j++) {
+        vector.set(j, (i == j ? 1.0 : 0.0));
+      }
+      identity.setRow(i, vector);
+    }
+
+    return identity;
+  }
+
+  /**
+   * Gets the double value of (i, j)
+   * 
+   * @param i ith row of the matrix
+   * @param j jth column of the matrix
+   * @return the value of entry, or zero If entry is null
+   * @throws IOException
+   */
+  public double get(int i, int j) throws IOException {
+    if (this.getRows() < i || this.getColumns() < j)
+      throw new ArrayIndexOutOfBoundsException(i + ", " + j);
+
+    Cell c = table.get(BytesUtil.getRowIndex(i), BytesUtil.getColumnIndex(j));
+    if (c == null)
+      throw new NullPointerException("Unexpected null");
+
+    return BytesUtil.bytesToDouble(c.getValue());
+  }
+
+  /**
+   * Gets the vector of row
+   * 
+   * @param i the row index of the matrix
+   * @return the vector of row
+   * @throws IOException
+   */
+  public DenseVector getRow(int i) throws IOException {
+    return new DenseVector(table.getRow(BytesUtil.getRowIndex(i),
+        new byte[][] { Bytes.toBytes(Constants.COLUMN) }));
+  }
+
+  /**
+   * Gets the vector of column
+   * 
+   * @param j the column index of the matrix
+   * @return the vector of column
+   * @throws IOException
+   */
+  public DenseVector getColumn(int j) throws IOException {
+    byte[] columnKey = BytesUtil.getColumnIndex(j);
+    byte[][] c = { columnKey };
+    Scanner scan = table.getScanner(c, HConstants.EMPTY_START_ROW);
+
+    MapWritable trunk = new MapWritable();
+
+    for (RowResult row : scan) {
+      trunk.put(new IntWritable(BytesUtil.bytesToInt(row.getRow())),
+          new DoubleEntry(row.get(columnKey)));
+    }
+
+    return new DenseVector(trunk);
+  }
+
+  /** {@inheritDoc} */
+  public void set(int i, int j, double value) throws IOException {
+    if (this.getRows() < i || this.getColumns() < j)
+      throw new ArrayIndexOutOfBoundsException(i + ", " + j);
+    VectorUpdate update = new VectorUpdate(i);
+    update.put(j, value);
+    table.commit(update.getBatchUpdate());
+  }
+
+  /**
+   * Set the row of a matrix to a given vector
+   * 
+   * @param row
+   * @param vector
+   * @throws IOException
+   */
+  public void setRow(int row, Vector vector) throws IOException {
+    if (this.getRows() < row || this.getColumns() < vector.size())
+      throw new ArrayIndexOutOfBoundsException(row);
+
+    VectorUpdate update = new VectorUpdate(row);
+    update.putAll(vector.getEntries());
+    table.commit(update.getBatchUpdate());
+  }
+
+  /**
+   * Set the column of a matrix to a given vector
+   * 
+   * @param column
+   * @param vector
+   * @throws IOException
+   */
+  public void setColumn(int column, Vector vector) throws IOException {
+    if (this.getColumns() < column || this.getRows() < vector.size())
+      throw new ArrayIndexOutOfBoundsException(column);
+
+    for (Map.Entry<Writable, Writable> e : vector.getEntries().entrySet()) {
+      int key = ((IntWritable) e.getKey()).get();
+      double value = ((DoubleEntry) e.getValue()).getValue();
+      VectorUpdate update = new VectorUpdate(key);
+      update.put(column, value);
+      table.commit(update.getBatchUpdate());
+    }
+  }
+
+  /**
+   * C = alpha*B + A
+   * 
+   * @param alpha
+   * @param B
+   * @return C
+   * @throws IOException
+   */
+  public DenseMatrix add(double alpha, Matrix B) throws IOException {
+    ensureForAddition(B);
+
+    DenseMatrix result = new DenseMatrix(config, this.getRows(), this
+        .getColumns());
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("addition MR job" + result.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+    RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(), Double
+        .toString(alpha), RowCyclicAdditionMap.class, IntWritable.class,
+        MapWritable.class, jobConf);
+    RowCyclicAdditionReduce.initJob(result.getPath(),
+        RowCyclicAdditionReduce.class, jobConf);
+
+    JobManager.execute(jobConf);
+    return result;
+  }
+
+  /**
+   * C = B + A
+   * 
+   * @param B
+   * @return C
+   * @throws IOException
+   */
+  public DenseMatrix add(Matrix B) throws IOException {
+    return add(1.0, B);
+  }
+
+  public DenseMatrix add(Matrix... matrices) throws IOException {
+    // ensure all the matrices are suitable for addition.
+    for (Matrix m : matrices) {
+      ensureForAddition(m);
+    }
+
+    DenseMatrix result = new DenseMatrix(config, this.getRows(), this
+        .getColumns());
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("addition MR job" + result.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+    StringBuilder summandList = new StringBuilder();
+    StringBuilder alphaList = new StringBuilder();
+    for (Matrix m : matrices) {
+      summandList.append(m.getPath());
+      summandList.append(",");
+      alphaList.append("1");
+      alphaList.append(",");
+    }
+    summandList.deleteCharAt(summandList.length() - 1);
+    alphaList.deleteCharAt(alphaList.length() - 1);
+
+    RowCyclicAdditionMap.initJob(this.getPath(), summandList.toString(),
+        alphaList.toString(), RowCyclicAdditionMap.class, IntWritable.class,
+        MapWritable.class, jobConf);
+    RowCyclicAdditionReduce.initJob(result.getPath(),
+        RowCyclicAdditionReduce.class, jobConf);
+
+    JobManager.execute(jobConf);
+    return result;
+  }
+
+  private void ensureForAddition(Matrix m) throws IOException {
+    if (getRows() != m.getRows() || getColumns() != m.getColumns()) {
+      throw new IOException(
+          "Matrices' rows and columns should be same while A+B.");
+    }
+  }
+
+  /**
+   * C = A*B using iterative method
+   * 
+   * @param B
+   * @return C
+   * @throws IOException
+   */
+  public DenseMatrix mult(Matrix B) throws IOException {
+    ensureForMultiplication(B);
+    int columns = 0;
+    if(B.getColumns() == 1 || this.getColumns() == 1)
+      columns = 1;
+    else
+      columns = this.getColumns();
+    
+    DenseMatrix result = new DenseMatrix(config, this.getRows(), columns);
+
+    for (int i = 0; i < this.getRows(); i++) {
+      JobConf jobConf = new JobConf(config);
+      jobConf.setJobName("multiplication MR job : " + result.getPath() + " "
+          + i);
+
+      jobConf.setNumMapTasks(config.getNumMapTasks());
+      jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+      DenseMatrixVectorMultMap.initJob(i, this.getPath(), B.getPath(),
+          DenseMatrixVectorMultMap.class, IntWritable.class, MapWritable.class,
+          jobConf);
+      DenseMatrixVectorMultReduce.initJob(result.getPath(),
+          DenseMatrixVectorMultReduce.class, jobConf);
+      JobManager.execute(jobConf);
+    }
+
+    return result;
+  }
+
+  /**
+   * C = A * B using Blocking algorithm
+   * 
+   * @param B
+   * @param blocks the number of blocks
+   * @return C
+   * @throws IOException
+   */
+  public DenseMatrix mult(Matrix B, int blocks) throws IOException {
+    ensureForMultiplication(B);
+
+    String collectionTable = "collect_" + RandomVariable.randMatrixPath();
+    HTableDescriptor desc = new HTableDescriptor(collectionTable);
+    desc.addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK)));
+    this.admin.createTable(desc);
+    LOG.info("Collect Blocks");
+
+    collectBlocksMapRed(this.getPath(), collectionTable, blocks, true);
+    collectBlocksMapRed(B.getPath(), collectionTable, blocks, false);
+
+    DenseMatrix result = new DenseMatrix(config, this.getRows(), this
+        .getColumns());
+
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("multiplication MR job : " + result.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+    BlockMultiplyMap.initJob(collectionTable, BlockMultiplyMap.class,
+        BlockID.class, BlockWritable.class, jobConf);
+    BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class,
+        jobConf);
+
+    JobManager.execute(jobConf);
+    hamaAdmin.delete(collectionTable);
+    return result;
+  }
+
+  private void ensureForMultiplication(Matrix m) throws IOException {
+    if (getColumns() != m.getRows()) {
+      throw new IOException("A's columns should equal with B's rows while A*B.");
+    }
+  }
+
+  /**
+   * C = alpha*A*B + C
+   * 
+   * @param alpha
+   * @param B
+   * @param C
+   * @return C
+   * @throws IOException
+   */
+  public Matrix multAdd(double alpha, Matrix B, Matrix C) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  /**
+   * Computes the given norm of the matrix
+   * 
+   * @param type
+   * @return norm of the matrix
+   * @throws IOException
+   */
+  public double norm(Norm type) throws IOException {
+    if (type == Norm.One)
+      return getNorm1();
+    else if (type == Norm.Frobenius)
+      return getFrobenius();
+    else if (type == Norm.Infinity)
+      return getInfinity();
+    else
+      return getMaxvalue();
+  }
+
+  /**
+   * Returns type of matrix
+   */
+  public String getType() {
+    return this.getClass().getSimpleName();
+  }
+
+  /**
+   * Returns the sub matrix formed by selecting certain rows and columns from a
+   * bigger matrix. The sub matrix is a in-memory operation only.
+   * 
+   * @param i0 the start index of row
+   * @param i1 the end index of row
+   * @param j0 the start index of column
+   * @param j1 the end index of column
+   * @return the sub matrix of matrix
+   * @throws IOException
+   */
+  public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException {
+    int columnSize = (j1 - j0) + 1;
+    SubMatrix result = new SubMatrix((i1 - i0) + 1, columnSize);
+
+    byte[][] cols = new byte[columnSize][];
+    for (int j = j0, jj = 0; j <= j1; j++, jj++) {
+      cols[jj] = BytesUtil.getColumnIndex(j);
+    }
+
+    Scanner scan = table.getScanner(cols, BytesUtil.getRowIndex(i0), BytesUtil
+        .getRowIndex(i1 + 1));
+    Iterator<RowResult> it = scan.iterator();
+    int i = 0;
+    RowResult rs = null;
+    while (it.hasNext()) {
+      rs = it.next();
+      for (int j = j0, jj = 0; j <= j1; j++, jj++) {
+        result.set(i, jj, rs.get(BytesUtil.getColumnIndex(j)).getValue());
+      }
+      i++;
+    }
+
+    scan.close();
+    return result;
+  }
+
+  /**
+   * Collect Blocks
+   * 
+   * @param path a input path
+   * @param collectionTable the collection table
+   * @param blockNum the number of blocks
+   * @param bool
+   * @throws IOException
+   */
+  public void collectBlocksMapRed(String path, String collectionTable,
+      int blockNum, boolean bool) throws IOException {
+    double blocks = Math.pow(blockNum, 0.5);
+    if (!String.valueOf(blocks).endsWith(".0"))
+      throw new IOException("can't divide.");
+
+    int block_size = (int) blocks;
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("Blocking MR job" + getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+    jobConf.setMapperClass(CollectBlocksMapper.class);
+    jobConf.setInputFormat(VectorInputFormat.class);
+    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+
+    FileInputFormat.addInputPaths(jobConf, path);
+
+    CollectBlocksMapper.initJob(collectionTable, bool, block_size, this
+        .getRows(), this.getColumns(), jobConf);
+
+    JobManager.execute(jobConf);
+  }
+
+  /**
+   * Compute all the eigen values. Note: all the eigen values are collected in
+   * the "eival:value" column, and the eigen vector of a specified eigen value
+   * is collected in the "eivec:" column family in the same row.
+   * 
+   * TODO: we may need to expose the interface to access the eigen values and
+   * vectors
+   * 
+   * @param loops limit the loops of the computation
+   * @throws IOException
+   */
+  public void jacobiEigenValue(int loops) throws IOException {
+    JobConf jobConf = new JobConf(config);
+
+    /***************************************************************************
+     * Initialization
+     * 
+     * A M/R job is used for initialization(such as, preparing a matrx copy of
+     * the original in "eicol:" family.)
+     **************************************************************************/
+    // initialization
+    jobConf.setJobName("JacobiEigen initialization MR job" + getPath());
+
+    jobConf.setMapperClass(JacobiEigenValue.InitMapper.class);
+    jobConf.setInputFormat(VectorInputFormat.class);
+    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+
+    FileInputFormat.addInputPaths(jobConf, getPath());
+    jobConf.set(JacobiEigenValue.MATRIX, getPath());
+    jobConf.setOutputFormat(NullOutputFormat.class);
+    jobConf.setMapOutputKeyClass(IntWritable.class);
+    jobConf.setMapOutputValueClass(MapWritable.class);
+
+    JobManager.execute(jobConf);
+
+    final FileSystem fs = FileSystem.get(jobConf);
+    Pair pivotPair = new Pair();
+    DoubleWritable pivotWritable = new DoubleWritable();
+    VectorUpdate vu;
+
+    // loop
+    int size = this.getRows();
+    int state = size;
+    int pivot_row, pivot_col;
+    double pivot;
+    double s, c, t, y;
+
+    while (state != 0 && loops > 0) {
+      /*************************************************************************
+       * Find the pivot and its index(pivot_row, pivot_col)
+       * 
+       * A M/R job is used to scan all the "eival:ind" to get the max absolute
+       * value of each row, and do a MAX aggregation of these max values to get
+       * the max value in the matrix.
+       ************************************************************************/
+      jobConf = new JobConf(config);
+      jobConf.setJobName("Find Pivot MR job" + getPath());
+
+      jobConf.setNumReduceTasks(1);
+
+      Path outDir = new Path(new Path(getType() + "_TMP_FindPivot_dir_"
+          + System.currentTimeMillis()), "out");
+      if (fs.exists(outDir))
+        fs.delete(outDir, true);
+
+      jobConf.setMapperClass(JacobiEigenValue.PivotMapper.class);
+      jobConf.setInputFormat(JacobiEigenValue.PivotInputFormat.class);
+      jobConf.set(JacobiEigenValue.PivotInputFormat.COLUMN_LIST,
+          JacobiEigenValue.EIIND);
+      FileInputFormat.addInputPaths(jobConf, getPath());
+      jobConf.setMapOutputKeyClass(Pair.class);
+      jobConf.setMapOutputValueClass(DoubleWritable.class);
+
+      jobConf.setOutputKeyClass(Pair.class);
+      jobConf.setOutputValueClass(DoubleWritable.class);
+      jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+      FileOutputFormat.setOutputPath(jobConf, outDir);
+
+      // update the out put dir of the job
+      outDir = FileOutputFormat.getOutputPath(jobConf);
+
+      JobManager.execute(jobConf);
+
+      // read outputs
+      Path inFile = new Path(outDir, "part-00000");
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+      try {
+        reader.next(pivotPair, pivotWritable);
+        pivot_row = pivotPair.getRow();
+        pivot_col = pivotPair.getColumn();
+        pivot = pivotWritable.get();
+      } finally {
+        reader.close();
+      }
+      fs.delete(outDir.getParent(), true);
+
+      /*************************************************************************
+       * Calculation
+       * 
+       * Compute the rotation parameters of next rotation.
+       ************************************************************************/
+      double e1 = BytesUtil.bytesToDouble(table.get(
+          BytesUtil.getRowIndex(pivot_row),
+          Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+      double e2 = BytesUtil.bytesToDouble(table.get(
+          BytesUtil.getRowIndex(pivot_col),
+          Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+
+      y = (e2 - e1) / 2;
+      t = Math.abs(y) + Math.sqrt(pivot * pivot + y * y);
+      s = Math.sqrt(pivot * pivot + t * t);
+      c = t / s;
+      s = pivot / s;
+      t = (pivot * pivot) / t;
+      if (y < 0) {
+        s = -s;
+        t = -t;
+      }
+
+      /*************************************************************************
+       * Upate the pivot and the eigen values indexed by the pivot
+       ************************************************************************/
+      vu = new VectorUpdate(pivot_row);
+      vu.put(JacobiEigenValue.EICOL, pivot_col, 0);
+      table.commit(vu.getBatchUpdate());
+
+      state = update(pivot_row, -t, state);
+      state = update(pivot_col, t, state);
+
+      /*************************************************************************
+       * Rotation the matrix
+       ************************************************************************/
+      // rotation
+      jobConf = new JobConf(config);
+      jobConf.setJobName("Rotation Matrix MR job" + getPath());
+
+      jobConf.setInt(JacobiEigenValue.PIVOTROW, pivot_row);
+      jobConf.setInt(JacobiEigenValue.PIVOTCOL, pivot_col);
+      jobConf.set(JacobiEigenValue.PIVOTSIN, String.valueOf(s));
+      jobConf.set(JacobiEigenValue.PIVOTCOS, String.valueOf(c));
+
+      jobConf.setMapperClass(DummyMapper.class);
+      jobConf.setInputFormat(JacobiEigenValue.RotationInputFormat.class);
+      jobConf.set(JacobiEigenValue.RotationInputFormat.COLUMN_LIST,
+          JacobiEigenValue.EIIND);
+      FileInputFormat.addInputPaths(jobConf, getPath());
+      jobConf.setMapOutputKeyClass(NullWritable.class);
+      jobConf.setMapOutputValueClass(NullWritable.class);
+      FileInputFormat.addInputPaths(jobConf, getPath());
+      jobConf.setOutputFormat(NullOutputFormat.class);
+
+      JobManager.execute(jobConf);
+
+      // rotate eigenvectors
+      LOG.info("rotating eigenvector");
+      for (int i = 0; i < size; i++) {
+        e1 = BytesUtil.bytesToDouble(table.get(
+            BytesUtil.getRowIndex(pivot_row),
+            Bytes.toBytes(JacobiEigenValue.EIVEC + i)).getValue());
+        e2 = BytesUtil.bytesToDouble(table.get(
+            BytesUtil.getRowIndex(pivot_col),
+            Bytes.toBytes(JacobiEigenValue.EIVEC + i)).getValue());
+
+        vu = new VectorUpdate(pivot_row);
+        vu.put(JacobiEigenValue.EIVEC, i, c * e1 - s * e2);
+        table.commit(vu.getBatchUpdate());
+
+        vu = new VectorUpdate(pivot_col);
+        vu.put(JacobiEigenValue.EIVEC, i, s * e1 + c * e2);
+        table.commit(vu.getBatchUpdate());
+      }
+
+      LOG.info("update index...");
+      // update index array
+      maxind(pivot_row, size);
+      maxind(pivot_col, size);
+
+      loops--;
+    }
+  }
+
+  void maxind(int row, int size) throws IOException {
+    int m = row + 1;
+    if (row + 2 < size) {
+      double max = BytesUtil.bytesToDouble(table
+          .get(BytesUtil.getRowIndex(row),
+              Bytes.toBytes(JacobiEigenValue.EICOL + m)).getValue());
+      double val;
+      for (int i = row + 2; i < size; i++) {
+        val = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row),
+            Bytes.toBytes(JacobiEigenValue.EICOL + i)).getValue());
+        if (Math.abs(val) > Math.abs(max)) {
+          m = i;
+          max = val;
+        }
+      }
+    }
+
+    VectorUpdate vu = new VectorUpdate(row);
+    vu.put(JacobiEigenValue.EIIND, m);
+    table.commit(vu.getBatchUpdate());
+  }
+
+  int update(int row, double value, int state) throws IOException {
+    double e = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row),
+        Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+    int changed = BytesUtil.bytesToInt(table.get(BytesUtil.getRowIndex(row),
+        Bytes.toBytes(JacobiEigenValue.EICHANGED)).getValue());
+    double y = e;
+    e += value;
+
+    VectorUpdate vu = new VectorUpdate(row);
+    vu.put(JacobiEigenValue.EIVAL, e);
+    if (changed == 1 && (Math.abs(y - e) < .0000001)) { // y == e) {
+      changed = 0;
+      vu.put(JacobiEigenValue.EICHANGED, changed);
+      state--;
+    } else if (changed == 0 && (Math.abs(y - e) > .0000001)) {
+      changed = 1;
+      vu.put(JacobiEigenValue.EICHANGED, changed);
+      state++;
+    }
+    table.commit(vu.getBatchUpdate());
+    return state;
+  }
+
+  // for test
+  boolean verifyEigenValue(double[] e, double[][] E) throws IOException {
+    boolean success = true;
+    double e1, ev;
+    for (int i = 0; i < e.length; i++) {
+      e1 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(i),
+          Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+      success &= ((Math.abs(e1 - e[i]) < .0000001));
+      if (!success)
+        return success;
+
+      for (int j = 0; j < E[i].length; j++) {
+        ev = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(i),
+            Bytes.toBytes(JacobiEigenValue.EIVEC + j)).getValue());
+        success &= ((Math.abs(ev - E[i][j]) < .0000001));
+        if (!success)
+          return success;
+      }
+    }
+    return success;
+  }
+}

Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/DenseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/DenseVector.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/DenseVector.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/DenseVector.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,310 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.log4j.Logger;
+
+/**
+ * This class represents a dense vector.
+ */
+public class DenseVector extends AbstractVector implements Vector {
+  static final Logger LOG = Logger.getLogger(DenseVector.class);
+
+  public DenseVector() {
+    this(new MapWritable());
+  }
+
+  public DenseVector(MapWritable m) {
+    this.entries = m;
+  }
+
+  public DenseVector(RowResult row) {
+    this.initMap(row);
+  }
+
+  public DenseVector(int row, MapWritable m) {
+    this.entries = m;
+    this.entries.put(Constants.ROWCOUNT, new IntWritable(row));
+  }
+
+  /**
+   * Sets the value of index
+   * 
+   * @param index
+   * @param value
+   */
+  public void set(int index, double value) {
+    // If entries are null, create new object
+    if (this.entries == null) {
+      this.entries = new MapWritable();
+    }
+
+    this.entries.put(new IntWritable(index), new DoubleEntry(value));
+  }
+
+  /**
+   * Sets the vector
+   * 
+   * @param v
+   * @return x = v
+   */
+  public DenseVector set(Vector v) {
+    this.set(1, v);
+    return this;
+  }
+
+  public Vector set(double alpha, Vector v) {
+    checkComformantSize(v);
+    boolean zeroFill = false;
+    if(alpha == 0) 
+      zeroFill = true;
+    
+    for (Map.Entry<Writable, Writable> e : v.getEntries().entrySet()) {
+      int key = ((IntWritable) e.getKey()).get();
+      if(zeroFill)
+        this.set(key, 0);
+      else
+        this.set(key, alpha * ((DoubleEntry) e.getValue()).getValue());
+    }
+    
+    return this;
+  }
+  
+  public void setRow(int row) {
+    this.entries.put(Constants.ROWCOUNT, new IntWritable(row));
+  }
+
+  /**
+   * Gets the value of index
+   * 
+   * @param index
+   * @return the value of v(index)
+   * @throws IOException
+   */
+  public double get(int index) {
+    double value;
+    try {
+      value = ((DoubleEntry) this.entries.get(new IntWritable(index)))
+          .getValue();
+    } catch (NullPointerException e) {
+      throw new NullPointerException("Unexpected null value : " + e.toString());
+    }
+
+    return value;
+  }
+  
+  public int getRow() {
+    return ((IntWritable) this.entries.get(Constants.ROWCOUNT)).get();
+  }
+
+  /**
+   * Adds the value to v(index)
+   * 
+   * @param index
+   * @param value
+   */
+  public void add(int index, double value) {
+    set(index, get(index) + value);
+  }
+  
+  /**
+   * x = alpha*v + x
+   * 
+   * @param alpha
+   * @param v
+   * @return x = alpha*v + x
+   */
+  public DenseVector add(double alpha, Vector v) {
+    checkComformantSize(v);
+    if (alpha == 0)
+      return this;
+
+    for (Map.Entry<Writable, Writable> e : this.getEntries().entrySet()) {
+      int key = ((IntWritable) e.getKey()).get();
+      this.add(key, alpha * v.get(key));
+    }
+
+    return this;
+  }
+
+  /**
+   * x = v + x
+   * 
+   * @param v2
+   * @return x = v + x
+   */
+  public DenseVector add(Vector v2) {
+    checkComformantSize(v2);
+
+    for (Map.Entry<Writable, Writable> e : this.getEntries().entrySet()) {
+      int key = ((IntWritable) e.getKey()).get();
+      this.add(key, v2.get(key));
+    }
+
+    return this;
+  }
+
+  /**
+   * x dot v
+   * 
+   * @param v
+   * @return x dot v
+   */
+  public double dot(Vector v) {
+    checkComformantSize(v);
+    
+    double cosine = 0.0;
+    double q_i, d_i;
+    for (int i = 0; i < Math.min(this.size(), v.size()); i++) {
+      q_i = v.get(i);
+      d_i = this.get(i);
+      cosine += q_i * d_i;
+    }
+    return cosine / (this.getNorm2() * ((DenseVector) v).getNorm2());
+  }
+
+  /**
+   * v = alpha*v
+   * 
+   * @param alpha
+   * @return v = alpha*v
+   */
+  public DenseVector scale(double alpha) {
+    for (Map.Entry<Writable, Writable> e : this.entries.entrySet()) {
+      this.entries.put(e.getKey(), new DoubleEntry(((DoubleEntry) e.getValue())
+          .getValue()
+          * alpha));
+    }
+    return this;
+  }
+
+  /**
+   * Computes the given norm of the vector
+   * 
+   * @param type
+   * @return norm of the vector
+   */
+  public double norm(Norm type) {
+    if (type == Norm.One)
+      return getNorm1();
+    else if (type == Norm.Two)
+      return getNorm2();
+    else if (type == Norm.TwoRobust)
+      return getNorm2Robust();
+    else
+      return getNormInf();
+  }
+
+  protected double getNorm1() {
+    double sum = 0.0;
+
+    Set<Writable> keySet = this.entries.keySet();
+    Iterator<Writable> it = keySet.iterator();
+
+    while (it.hasNext()) {
+      sum += get(((IntWritable) it.next()).get());
+    }
+
+    return sum;
+  }
+
+  protected double getNorm2() {
+    double square_sum = 0.0;
+
+    Set<Writable> keySet = entries.keySet();
+    Iterator<Writable> it = keySet.iterator();
+
+    while (it.hasNext()) {
+      double value = get(((IntWritable) it.next()).get());
+      square_sum += value * value;
+    }
+
+    return Math.sqrt(square_sum);
+  }
+
+  /**
+   * Returns the robust norm of the vector
+   * 
+   * @return the robust norm of the vector
+   */
+  protected double getNorm2Robust() {
+    double scale = 0, ssq = 1;
+    for (int i = 0; i < this.size(); i++) {
+      double val = get(i);
+      if (val != 0) {
+        double absxi = Math.abs(val);
+        if (scale < absxi) {
+          ssq = 1 + ssq * Math.pow(scale / absxi, 2);
+          scale = absxi;
+        } else
+          ssq = ssq + Math.pow(absxi / scale, 2);
+      }
+    }
+    return scale * Math.sqrt(ssq);
+  }
+
+  /**
+   * Returns the infinity norm of the vector
+   * 
+   * @return the infinity norm of the vector
+   */
+  protected double getNormInf() {
+    double max = 0.0;
+    for (int i = 0; i < this.size(); i++) {
+      max = Math.max(max, Math.abs(get(i)));
+    }
+    return max;
+  }
+
+  /**
+   * Returns a sub-vector.
+   * 
+   * @param i0 the index of the first element
+   * @param i1 the index of the last element
+   * @return v[i0:i1]
+   */
+  public DenseVector subVector(int i0, int i1) {
+    DenseVector res = new DenseVector();
+    if (this.entries.containsKey(Constants.ROWCOUNT))
+      res.setRow(this.getRow());
+
+    for (int i = i0; i <= i1; i++) {
+      res.set(i, get(i));
+    }
+
+    return res;
+  }
+
+  public void zeroFill(int size) {
+    for(int i = 0; i < size; i++) {
+      this.set(i, 0);
+    }
+  }
+}

Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/HamaAdmin.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/HamaAdmin.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/HamaAdmin.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/HamaAdmin.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,68 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+
+/**
+ * A administration interface to manage the matrix's namespace, and table
+ * allocation & garbage collection.
+ */
+public interface HamaAdmin {
+
+  /**
+   * Saves matrix as name 'AliaseName'
+   * 
+   * @param matrix
+   * @param aliaseName
+   * @return true if it saved
+   */
+  public boolean save(Matrix matrix, String aliaseName);
+
+  /**
+   * @param name
+   * @return return a physical path of matrix
+   */
+  public String getPath(String name);
+
+  /**
+   * @param matrixName
+   * @return true if matrix is exist
+   */
+  public boolean matrixExists(String matrixName);
+
+  /**
+   * Deletes matrix
+   * 
+   * @param matrixName
+   * @throws IOException
+   */
+  public void delete(String matrixName) throws IOException;
+
+  /**
+   * Load matrix
+   * 
+   * @param matrixName
+   * @return the matrix
+   * @throws IOException 
+   */
+  public Matrix getMatrix(String matrixName) throws IOException;
+
+}

Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/HamaAdminImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/HamaAdminImpl.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/HamaAdminImpl.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/HamaAdminImpl.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,214 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+
+/**
+ * An Implementation of {@link org.apache.hama.HamaAdmin} to manage the matrix's
+ * namespace, and table allocation & garbage collection.
+ */
+public class HamaAdminImpl implements HamaAdmin {
+  static final Logger LOG = Logger.getLogger(HamaAdminImpl.class);
+  protected HamaConfiguration conf;
+  protected HBaseAdmin admin;
+  protected HTable table;
+
+  /**
+   * Constructor
+   * 
+   * @param conf
+   * @throws MasterNotRunningException
+   */
+  public HamaAdminImpl(HamaConfiguration conf) throws MasterNotRunningException {
+    this.conf = conf;
+    this.admin = new HBaseAdmin(conf);
+    initialJob();
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param conf
+   * @param admin
+   */
+  public HamaAdminImpl(HamaConfiguration conf, HBaseAdmin admin) {
+    this.conf = conf;
+    this.admin = admin;
+    initialJob();
+  }
+
+  /**
+   * Initializing the admin.
+   */
+  private void initialJob() {
+    try {
+      if (!admin.tableExists(Constants.ADMINTABLE)) {
+        HTableDescriptor tableDesc = new HTableDescriptor(Constants.ADMINTABLE);
+        tableDesc.addFamily(new HColumnDescriptor(Constants.PATHCOLUMN));
+        admin.createTable(tableDesc);
+      }
+
+      table = new HTable(conf, Constants.ADMINTABLE);
+      table.setAutoFlush(true);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * @param name
+   * @return real table name
+   */
+  public String getPath(String name) {
+    try {
+      byte[] result = table.get(name, Constants.PATHCOLUMN).getValue();
+      return Bytes.toString(result);
+    } catch (IOException e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
+
+  public boolean matrixExists(String matrixName) {
+    try {
+      Cell result = table.get(matrixName, Constants.PATHCOLUMN);
+      return (result == null) ? false : true;
+    } catch (IOException e) {
+      e.printStackTrace();
+      return false;
+    }
+  }
+
+  public boolean save(Matrix mat, String aliaseName) {
+    boolean result = false;
+
+    // we just store the name -> path(tablename) here.
+    // the matrix type is stored in its hbase table. we don't need to store
+    // again.
+    BatchUpdate update = new BatchUpdate(aliaseName);
+    update.put(Constants.PATHCOLUMN, Bytes.toBytes(mat.getPath()));
+
+    try {
+      table.commit(update);
+
+      result = true;
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    return result;
+  }
+
+  /** remove the entry of 'matrixName' in admin table. * */
+  private void removeEntry(String matrixName) throws IOException {
+    table.deleteAll(matrixName);
+  }
+
+  private int getReference(String tableName) throws IOException {
+    HTable matrix = new HTable(conf, tableName);
+
+    Cell rows = null;
+    rows = matrix.get(Constants.METADATA, Constants.METADATA_REFERENCE);
+
+    return (rows == null) ? 0 : Bytes.toInt(rows.getValue());
+  }
+
+  private void clearAliaseInfo(String tableName) throws IOException {
+    HTable matrix = new HTable(conf, tableName);
+
+    matrix.deleteAll(Constants.METADATA, Constants.ALIASENAME);
+  }
+
+  /**
+   * we remove the aliase entry store in Admin table, and clear the aliase info
+   * store in matrix table. And check the reference of the matrix table:
+   * 
+   * 1) if the reference of the matrix table is zero: we delete the table. 2) if
+   * the reference of the matrix table is not zero: we let the matrix who still
+   * reference the table to do the garbage collection.
+   */
+  public void delete(String matrixName) throws IOException {
+    if (matrixExists(matrixName)) {
+      String tablename = getPath(matrixName);
+
+      // i) remove the aliase entry first.
+      removeEntry(matrixName);
+
+      if (tablename == null) { // a matrixName point to a null table. we delete
+        // the entry.
+        return;
+      }
+
+      if (!admin.tableExists(tablename)) { // have not specified table.
+        return;
+      }
+
+      // ii) clear the aliase info store in matrix table.
+      clearAliaseInfo(tablename);
+
+      if (getReference(tablename) <= 0) { // no reference, do gc!!
+        if (admin.isTableEnabled(tablename)) {
+          while (admin.isTableEnabled(tablename)) {
+            try {
+              admin.disableTable(tablename);
+            } catch (RegionException e) {
+              LOG.warn(e);
+            }
+          }
+
+          admin.deleteTable(tablename);
+        }
+      }
+    }
+  }
+
+  @Override
+  public Matrix getMatrix(String matrixName) throws IOException {
+    String path = getPath(matrixName);
+    if(getType(path).equals("SparseMatrix"))
+      return new SparseMatrix(conf, path);
+    else
+      return new DenseMatrix(conf, path);
+  }
+
+  private String getType(String path) {
+    try {
+      HTable matrix = new HTable(conf, path);
+      byte[] result = matrix.get(Constants.METADATA,
+          Constants.ATTRIBUTE + "type").getValue();
+      return Bytes.toString(result);
+    } catch (IOException e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
+}

Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/HamaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/HamaConfiguration.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/HamaConfiguration.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/HamaConfiguration.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,87 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+/**
+ * Adds Hama configuration files to a Configuration
+ */
+public class HamaConfiguration extends HBaseConfiguration {
+  /** constructor */
+  public HamaConfiguration() {
+    super();
+    addHamaResources();
+  }
+
+  /**
+   * Create a clone of passed configuration.
+   * 
+   * @param c Configuration to clone.
+   */
+  public HamaConfiguration(final Configuration c) {
+    this();
+    for (Entry<String, String> e : c) {
+      set(e.getKey(), e.getValue());
+    }
+  }
+
+  /**
+   * Sets the number of map task
+   * 
+   * @param map
+   */
+  public void setNumMapTasks(int map) {
+    set("mapred.map.tasks",String.valueOf(map));
+  }
+
+  /**
+   * Sets the number of reduce task
+   * 
+   * @param reduce
+   */
+  public void setNumReduceTasks(int reduce) {
+    set("mapred.reduce.tasks",String.valueOf(reduce));
+  }
+  
+  /**
+   * Gets the number of map task
+   */
+  public int getNumMapTasks() {
+    return Integer.parseInt(get("mapred.map.tasks"));
+  }
+
+  /**
+   * Gets the number of reduce task
+   */
+  public int getNumReduceTasks() {
+    return Integer.parseInt(get("mapred.reduce.tasks"));
+  }
+  
+  /**
+   * Adds Hama configuration files to a Configuration
+   */
+  private void addHamaResources() {
+    addResource("hama-site.xml");
+  }
+}

Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Matrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Matrix.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Matrix.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Matrix.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,287 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+
+/**
+ * Basic matrix interface.
+ */
+public interface Matrix {
+
+  /**
+   * Gets the double value of (i, j)
+   * 
+   * @param i ith row of the matrix
+   * @param j jth column of the matrix
+   * @return the value of entry
+   * @throws IOException
+   */
+  public double get(int i, int j) throws IOException;
+
+  /**
+   * Gets the vector of row
+   * 
+   * @param i the row index of the matrix
+   * @return the vector of row
+   * @throws IOException
+   */
+  public Vector getRow(int i) throws IOException;
+
+  /**
+   * Gets the vector of column
+   * 
+   * @param j the column index of the matrix
+   * @return the vector of column
+   * @throws IOException
+   */
+  public Vector getColumn(int j) throws IOException;
+
+  /**
+   * Get the number of row of the matrix from the meta-data column
+   * 
+   * @return a number of rows of the matrix
+   * @throws IOException
+   */
+  public int getRows() throws IOException;
+
+  /**
+   * Get the number of column of the matrix from the meta-data column
+   * 
+   * @return a number of columns of the matrix
+   * @throws IOException
+   */
+  public int getColumns() throws IOException;
+
+  /**
+   * Gets the label of the row
+   * 
+   * @throws IOException
+   */
+  public String getRowLabel(int i) throws IOException;
+
+  /**
+   * Gets the label of the column
+   * 
+   * @throws IOException
+   */
+  public String getColumnLabel(int j) throws IOException;
+
+  /**
+   * Return the matrix path. 
+   * (in hbase, path is the tablename. in filesystem, path may be a file path.)
+   * 
+   * @return the name of the matrix
+   */
+  public String getPath();
+
+  /**
+   * Sets the label of the row
+   * 
+   * @param i
+   * @param name
+   * @throws IOException
+   */
+  public void setRowLabel(int i, String name) throws IOException;
+
+  /**
+   * Sets the label of the column
+   * 
+   * @param j
+   * @param name
+   * @throws IOException
+   */
+  public void setColumnLabel(int j, String name) throws IOException;
+
+  /**
+   * Sets the double value of (i, j)
+   * 
+   * @param i ith row of the matrix
+   * @param j jth column of the matrix
+   * @param value the value of entry
+   * @throws IOException
+   */
+  public void set(int i, int j, double value) throws IOException;
+
+  /**
+   * A=alpha*B
+   * 
+   * @param alpha
+   * @param B
+   * @return A
+   * @throws IOException
+   */
+  public Matrix set(double alpha, Matrix B) throws IOException;
+
+  /**
+   * A=B
+   * 
+   * @param B
+   * @return A
+   * @throws IOException
+   */
+  public Matrix set(Matrix B) throws IOException;
+
+  /**
+   * Set the row of a matrix to a given vector
+   * 
+   * @param row
+   * @param vector
+   * @throws IOException
+   */
+  public void setRow(int row, Vector vector) throws IOException;
+
+  /**
+   * Set the column of a matrix to a given vector
+   * 
+   * @param column
+   * @param vector
+   * @throws IOException
+   */
+  public void setColumn(int column, Vector vector) throws IOException;
+
+  /**
+   * Sets the dimension of matrix
+   * 
+   * @param rows the number of rows
+   * @param columns the number of columns
+   * @throws IOException
+   */
+  public void setDimension(int rows, int columns) throws IOException;
+
+  /**
+   * A(i, j) += value
+   * 
+   * @param i
+   * @param j
+   * @param value
+   * @throws IOException
+   */
+  public void add(int i, int j, double value) throws IOException;
+
+  /**
+   * A = B + A
+   * 
+   * @param B
+   * @return A
+   * @throws IOException
+   */
+  public Matrix add(Matrix B) throws IOException;
+
+  /**
+   * A = alpha*B + A
+   * 
+   * @param alpha
+   * @param B
+   * @return A
+   * @throws IOException
+   */
+  public Matrix add(double alpha, Matrix B) throws IOException;
+
+  /**
+   * C = A*B
+   * 
+   * @param B
+   * @return C
+   * @throws IOException
+   */
+  public Matrix mult(Matrix B) throws IOException;
+
+  /**
+   * C = alpha*A*B + C
+   * 
+   * @param alpha
+   * @param B
+   * @param C
+   * @return C
+   * @throws IOException
+   */
+  public Matrix multAdd(double alpha, Matrix B, Matrix C) throws IOException;
+
+  /**
+   * Computes the given norm of the matrix
+   * 
+   * @param type
+   * @return norm of the matrix
+   * @throws IOException
+   */
+  public double norm(Norm type) throws IOException;
+
+  /**
+   * Supported matrix-norms.
+   */
+  enum Norm {
+    /** Maximum absolute column sum */
+    One,
+
+    /** The root of sum of the sum of squares */
+    Frobenius,
+    
+    /** The maximum absolute row sum */
+    Infinity,
+    
+    /** Largest entry in absolute value.  */
+    Maxvalue
+  }
+
+  /**
+   * Transposes the matrix. In most cases, the matrix must be square
+   * for this to work.
+   * 
+   * @return the transposed matrix
+   */
+  public Matrix transpose() throws IOException;
+  
+  /**
+   * Save to a table or file
+   * 
+   * @param path
+   * @return true if saved
+   * @throws IOException
+   */
+  public boolean save(String path) throws IOException;
+
+  /**
+   * Returns the matrix type
+   * 
+   * @return the matrix type
+   */
+  public String getType();
+
+  /**
+   * Returns the sub matrix formed by selecting certain rows and
+   * columns from a bigger matrix. The sub matrix is a in-memory operation only.
+   * 
+   * @param i0 the start index of row
+   * @param i1 the end index of row
+   * @param j0 the start index of column
+   * @param j1 the end index of column
+   * @return the sub matrix of matrix
+   * @throws IOException
+   */
+  public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException;
+
+  /**
+   * Close current matrix.
+   * 
+   * @throws Exception
+   */
+  public void close() throws IOException;
+}



Mime
View raw message