Return-Path: Delivered-To: apmail-incubator-hama-commits-archive@locus.apache.org Received: (qmail 14962 invoked from network); 29 Jul 2008 09:34:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 29 Jul 2008 09:34:20 -0000 Received: (qmail 63365 invoked by uid 500); 29 Jul 2008 09:34:20 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 63350 invoked by uid 500); 29 Jul 2008 09:34:19 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@ Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 63339 invoked by uid 99); 29 Jul 2008 09:34:19 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Jul 2008 02:34:19 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Jul 2008 09:33:25 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D342E238896E; Tue, 29 Jul 2008 02:33:21 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r680652 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/ src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/ src/test/org/apache/hama/ src/test/org/apache/hama/mapred/ Date: Tue, 29 Jul 2008 09:33:20 -0000 To: hama-commits@incubator.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080729093321.D342E238896E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: edwardyoon Date: Tue Jul 29 02:33:18 2008 New Revision: 680652 URL: http://svn.apache.org/viewvc?rev=680652&view=rev Log: https://issues.apache.org/jira/browse/HAMA-10 Added: incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java incubator/hama/trunk/src/java/org/apache/hama/Constants.java incubator/hama/trunk/src/java/org/apache/hama/Vector.java incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java incubator/hama/trunk/src/java/org/apache/hama/io/ incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java incubator/hama/trunk/src/java/org/apache/hama/mapred/ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java incubator/hama/trunk/src/test/org/apache/hama/TestVector.java Removed: incubator/hama/trunk/src/java/org/apache/hama/HamaConstants.java incubator/hama/trunk/src/test/org/apache/hama/TestFeatureVector.java Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java incubator/hama/trunk/src/java/org/apache/hama/Matrix.java incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=680652&r1=680651&r2=680652&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Tue Jul 29 02:33:18 2008 @@ -2,6 +2,8 @@ Trunk (unreleased changes) + HAMA-10: Refactor the mapred package for the latest version of dependencies (edwardyoon) + HAMA-9: Upgrade dependencies (edwardyoon) HAMA-6: Add a 'who we are' page (edwardyoon) HAMA-7: Add some information for a new comer (edwardyoon) HAMA-1: Create the Hama web site (edwardyoon via Ian Holsman) Added: incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java?rev=680652&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java Tue Jul 29 02:33:18 2008 @@ -0,0 +1,15 @@ +package org.apache.hama; + +public abstract class AbstractBase { + /** + * Return the integer column index + * + * @param b key + * @return integer + */ + public int getColumnIndex(byte[] b) { + String cKey = new String(b); + return Integer.parseInt(cKey + .substring(cKey.indexOf(":") + 1, cKey.length())); + } +} Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=680652&r1=680651&r2=680652&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Tue Jul 29 02:33:18 2008 @@ -38,7 +38,7 @@ /** * Methods of the matrix classes */ -public abstract class AbstractMatrix implements MatrixInterface { +public abstract class AbstractMatrix extends AbstractBase implements MatrixInterface { static final Logger LOG = Logger.getLogger(AbstractMatrix.class); /** Hbase Configuration */ @@ -77,7 +77,7 @@ */ protected void create() { try { - tableDesc.addFamily(new HColumnDescriptor(HamaConstants.METADATA + tableDesc.addFamily(new HColumnDescriptor(Constants.METADATA .toString())); LOG.info("Initializaing."); admin.createTable(tableDesc); @@ -90,7 +90,7 @@ public int getRowDimension() { Cell rows = null; try { - rows = table.get(HamaConstants.METADATA, HamaConstants.METADATA_ROWS); + rows = table.get(Constants.METADATA, Constants.METADATA_ROWS); } catch (IOException e) { LOG.error(e, e); } @@ -102,8 +102,8 @@ public int getColumnDimension() { Cell columns = null; try { - columns = table.get(HamaConstants.METADATA, - HamaConstants.METADATA_COLUMNS); + columns = table.get(Constants.METADATA, + Constants.METADATA_COLUMNS); } catch (IOException e) { LOG.error(e, e); } @@ -113,7 +113,7 @@ /** {@inheritDoc} */ public double get(int i, int j) { Text row = new Text(String.valueOf(i)); - Text column = new Text(HamaConstants.COLUMN + String.valueOf(j)); + Text column = new Text(Constants.COLUMN + String.valueOf(j)); Cell c; double result = -1; try { @@ -157,7 +157,7 @@ /** {@inheritDoc} */ public void set(int i, int j, double d) { BatchUpdate b = new BatchUpdate(new Text(String.valueOf(i))); - b.put(new Text(HamaConstants.COLUMN + String.valueOf(j)), doubleToBytes(d)); + b.put(new Text(Constants.COLUMN + String.valueOf(j)), doubleToBytes(d)); try { table.commit(b); } catch (IOException e) { @@ -187,9 +187,9 @@ /** {@inheritDoc} */ public void setDimension(int rows, int columns) { - BatchUpdate b = new BatchUpdate(HamaConstants.METADATA); - b.put(HamaConstants.METADATA_ROWS, Bytes.toBytes(rows)); - b.put(HamaConstants.METADATA_COLUMNS, Bytes.toBytes(columns)); + BatchUpdate b = new BatchUpdate(Constants.METADATA); + b.put(Constants.METADATA_ROWS, Bytes.toBytes(rows)); + b.put(Constants.METADATA_COLUMNS, Bytes.toBytes(columns)); try { table.commit(b); @@ -211,8 +211,8 @@ public double getDeterminant() { try { return bytesToDouble(table.get( - new Text(String.valueOf(HamaConstants.DETERMINANT)), - new Text(HamaConstants.COLUMN)).getValue()); + new Text(String.valueOf(Constants.DETERMINANT)), + new Text(Constants.COLUMN)).getValue()); } catch (IOException e) { LOG.error(e, e); return -1; @@ -245,16 +245,4 @@ LOG.error(e, e); } } - - /** - * Return the integer column index - * - * @param b key - * @return integer - */ - public int getColumnIndex(byte[] b) { - String cKey = new String(b); - return Integer.parseInt(cKey - .substring(cKey.indexOf(":") + 1, cKey.length())); - } } Added: incubator/hama/trunk/src/java/org/apache/hama/Constants.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=680652&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Tue Jul 29 02:33:18 2008 @@ -0,0 +1,62 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import org.apache.hadoop.io.Text; + +/** + * Some constants used in the hama + */ +public class Constants { + /** Meta-columnfamily to store the matrix-info */ + public final static Text METADATA = new Text("metadata:"); + /** The number of the matrix rows */ + public final static Text METADATA_ROWS = new Text("metadata:rows"); + /** The number of the matrix columns */ + public final static Text METADATA_COLUMNS = new Text("metadata:columns"); + /** The type of the matrix */ + public final static Text METADATA_TYPE = new Text("metadata:type"); + + /** plus operator */ + public final static String PLUS = "+"; + /** minus operator */ + public final static String MINUS = "-"; + + /** Default columnfamily name */ + public final static Text COLUMN = new Text("column:"); + /** The numerator version of the fraction matrix */ + public final static Text NUMERATOR = new Text("numerator:"); + /** The denominator version of the fration matrix */ + public final static Text DENOMINATOR = new Text("denominator:"); + /** The original version of the fraction matrix */ + public final static Text ORIGINAL = new Text("original:"); + /** The lower matrix version of the triangular matrix */ + public final static Text LOWER = new Text("lower:"); + /** The upper matrix version of the triangular matrix */ + public final static Text UPPER = new Text("upper:"); + + /** A determinant value record */ + public final static Text DETERMINANT = new Text("determinant"); + + /** Temporary random matices name-head */ + public final static String RANDOM = "rand"; + /** Temporary result matices name-head */ + public final static String RESULT = "result"; +} Modified: incubator/hama/trunk/src/java/org/apache/hama/Matrix.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Matrix.java?rev=680652&r1=680651&r2=680652&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/Matrix.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/Matrix.java Tue Jul 29 02:33:18 2008 @@ -55,7 +55,7 @@ if (!admin.tableExists(matrixName)) { tableDesc = new HTableDescriptor(matrixName.toString()); - tableDesc.addFamily(new HColumnDescriptor(HamaConstants.COLUMN + tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN .toString())); create(); } @@ -82,7 +82,7 @@ if (!admin.tableExists(matrixName)) { tableDesc = new HTableDescriptor(matrixName.toString()); - tableDesc.addFamily(new HColumnDescriptor(HamaConstants.COLUMN + tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN .toString())); create(); } @@ -154,12 +154,12 @@ /** {@inheritDoc} */ public Matrix addition(Matrix b) { - return additionSubtraction(b, HamaConstants.PLUS); + return additionSubtraction(b, Constants.PLUS); } /** {@inheritDoc} */ public Matrix subtraction(Matrix b) { - return additionSubtraction(b, HamaConstants.PLUS); + return additionSubtraction(b, Constants.PLUS); } /** Modified: incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java?rev=680652&r1=680651&r2=680652&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java Tue Jul 29 02:33:18 2008 @@ -56,7 +56,7 @@ * @return random name */ protected static Text randMatrixName() { - String rName = HamaConstants.RANDOM; + String rName = Constants.RANDOM; for (int i = 1; i <= 5; i++) { char ch = (char) ((Math.random() * 26) + 97); rName += ch; Added: incubator/hama/trunk/src/java/org/apache/hama/Vector.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Vector.java?rev=680652&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/Vector.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/Vector.java Tue Jul 29 02:33:18 2008 @@ -0,0 +1,127 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hama.io.VectorDatum; +import org.apache.log4j.Logger; + +public class Vector extends AbstractBase implements VectorInterface { + static final Logger LOG = Logger.getLogger(Vector.class); + protected int[] m_dims; + protected double[] m_vals; + + public Vector(RowResult r) { + parse(r.entrySet()); + } + + public Vector(VectorDatum r) { + parse(r.entrySet()); + } + + private void parse(Set> entrySet) { + SortedMap m = new TreeMap(); + for (Map.Entry f : entrySet) { + m.put(getColumnIndex(f.getKey()), Double.parseDouble(Bytes.toString(f + .getValue().getValue()))); + } + + this.m_dims = new int[m.keySet().size()]; + this.m_vals = new double[m.keySet().size()]; + + int i = 0; + for (Map.Entry f : m.entrySet()) { + this.m_dims[i] = f.getKey(); + this.m_vals[i] = f.getValue(); + i++; + } + } + + /** + * Returns the cosine similarity between two feature vectors. + */ + public double getCosine(Vector v) { + double cosine = 0.0; + int dim; + double q_i, d_i; + for (int i = 0; i < Math.min(this.size(), v.size()); i++) { + dim = v.getDimAt(i); + q_i = v.getValueAt(dim); + d_i = this.getValueAt(dim); + cosine += q_i * d_i; + } + return cosine / (this.getL2Norm() * v.getL2Norm()); + } + + /** + * Returns the linear norm factor of this vector's values (i.e., the sum of + * it's values). + */ + public double getL1Norm() { + double sum = 0.0; + for (int i = 0; i < m_vals.length; i++) { + sum += m_vals[i]; + } + return sum; + } + + /** + * Returns the L2 norm factor of this vector's values. + */ + public double getL2Norm() { + double square_sum = 0.0; + for (int i = 0; i < m_vals.length; i++) { + square_sum += (m_vals[i] * m_vals[i]); + } + return Math.sqrt(square_sum); + } + + public int getDimAt(int index) { + return m_dims[index]; + } + + public double getValueAt(int index) { + return m_vals[index]; + } + + public int size() { + return m_dims.length; + } + + public VectorDatum addition(byte[] bs, Vector v2) { + HbaseMapWritable trunk = new HbaseMapWritable(); + for (int i = 0; i < this.size(); i++) { + double value = (this.getValueAt(i) + v2.getValueAt(i)); + Cell cValue = new Cell(String.valueOf(value), 0); + trunk.put(Bytes.toBytes("column:" + i), cValue); + } + + return new VectorDatum(bs, trunk); + } +} Added: incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java?rev=680652&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java Tue Jul 29 02:33:18 2008 @@ -0,0 +1,30 @@ +package org.apache.hama; + +import org.apache.hama.io.VectorDatum; + +/** + * A vector. Features are dimension-value pairs. This class implements a + * simple dictionary data structure to map dimensions onto their values. Note + * that for convenience, features do not have be sorted according to their + * dimensions at this point. The SVMLightTrainer class has an option for sorting + * input vectors prior to training. + */ +public interface VectorInterface { + + public double getValueAt(int index); + + public int getDimAt(int index); + + public int size(); + + public double getL1Norm(); + + public double getL2Norm(); + + public double getCosine(Vector v); + + public VectorDatum addition(byte[] bs, Vector v2); + + // TODO: save, copy,...,etc + +} Added: incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java?rev=680652&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java Tue Jul 29 02:33:18 2008 @@ -0,0 +1,193 @@ +package org.apache.hama.io; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.Writable; +import org.apache.hama.Vector; + +public class VectorDatum implements Writable, Map { + private byte[] row = null; + private final HbaseMapWritable cells; + + public VectorDatum() { + this(null, new HbaseMapWritable()); + } + + /** + * Create a RowResult from a row and Cell map + */ + public VectorDatum(final byte[] row, final HbaseMapWritable m) { + this.row = row; + this.cells = m; + } + + /** + * Get the row for this RowResult + */ + public byte[] getRow() { + return row; + } + + public Cell put(@SuppressWarnings("unused") + byte[] key, @SuppressWarnings("unused") + Cell value) { + throw new UnsupportedOperationException("VectorDatum is read-only!"); + } + + @SuppressWarnings("unchecked") + public void putAll(@SuppressWarnings("unused") + Map map) { + throw new UnsupportedOperationException("VectorDatum is read-only!"); + } + + public Cell get(Object key) { + return (Cell) this.cells.get(key); + } + + public Cell remove(@SuppressWarnings("unused") + Object key) { + throw new UnsupportedOperationException("VectorDatum is read-only!"); + } + + public boolean containsKey(Object key) { + return cells.containsKey(key); + } + + public boolean containsValue(@SuppressWarnings("unused") + Object value) { + throw new UnsupportedOperationException("Don't support containsValue!"); + } + + public boolean isEmpty() { + return cells.isEmpty(); + } + + public int size() { + return cells.size(); + } + + public void clear() { + throw new UnsupportedOperationException("VectorDatum is read-only!"); + } + + public Set keySet() { + Set result = new TreeSet(Bytes.BYTES_COMPARATOR); + for (byte[] w : cells.keySet()) { + result.add(w); + } + return result; + } + + public Set> entrySet() { + return Collections.unmodifiableSet(this.cells.entrySet()); + } + + public Collection values() { + ArrayList result = new ArrayList(); + for (Writable w : cells.values()) { + result.add((Cell) w); + } + return result; + } + + /** + * Get the Cell that corresponds to column + */ + public Cell get(byte[] column) { + return this.cells.get(column); + } + + /** + * Get the Cell that corresponds to column, using a String key + */ + public Cell get(String key) { + return get(Bytes.toBytes(key)); + } + + /** + * Row entry. + */ + public class Entry implements Map.Entry { + private final byte[] column; + private final Cell cell; + + Entry(byte[] row, Cell cell) { + this.column = row; + this.cell = cell; + } + + public Cell setValue(@SuppressWarnings("unused") + Cell c) { + throw new UnsupportedOperationException("VectorDatum is read-only!"); + } + + public byte[] getKey() { + return column; + } + + public Cell getValue() { + return cell; + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("row="); + sb.append(Bytes.toString(this.row)); + sb.append(", cells={"); + boolean moreThanOne = false; + for (Map.Entry e : this.cells.entrySet()) { + if (moreThanOne) { + sb.append(", "); + } else { + moreThanOne = true; + } + sb.append("(column="); + sb.append(Bytes.toString(e.getKey())); + sb.append(", timestamp="); + sb.append(Long.toString(e.getValue().getTimestamp())); + sb.append(", value="); + byte[] v = e.getValue().getValue(); + if (Bytes.equals(e.getKey(), HConstants.COL_REGIONINFO)) { + try { + sb.append(Writables.getHRegionInfo(v).toString()); + } catch (IOException ioe) { + sb.append(ioe.toString()); + } + } else { + sb.append(v); + } + sb.append(")"); + } + sb.append("}"); + return sb.toString(); + } + + public void readFields(final DataInput in) throws IOException { + this.row = Bytes.readByteArray(in); + this.cells.readFields(in); + } + + public void write(final DataOutput out) throws IOException { + Bytes.writeByteArray(out, this.row); + this.cells.write(out); + } + + public Vector getVector() { + return new Vector(this); + } +} Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java?rev=680652&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java Tue Jul 29 02:33:18 2008 @@ -0,0 +1,58 @@ +package org.apache.hama.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; + +public class MatrixInputFormat extends MatrixInputFormatBase implements +JobConfigurable { +private final Log LOG = LogFactory.getLog(MatrixInputFormat.class); + +/** +* space delimited list of columns +* +* @see org.apache.hadoop.hbase.regionserver.HAbstractScanner for column name +* wildcards +*/ +public static final String COLUMN_LIST = "hbase.mapred.tablecolumns"; + +/** {@inheritDoc} */ +public void configure(JobConf job) { +Path[] tableNames = FileInputFormat.getInputPaths(job); +String colArg = job.get(COLUMN_LIST); +String[] colNames = colArg.split(" "); +byte [][] m_cols = new byte[colNames.length][]; +for (int i = 0; i < m_cols.length; i++) { + m_cols[i] = Bytes.toBytes(colNames[i]); +} +setInputColums(m_cols); +try { + setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName())); +} catch (Exception e) { + LOG.error(e); +} +} + +/** {@inheritDoc} */ +public void validateInput(JobConf job) throws IOException { +// expecting exactly one path +Path [] tableNames = FileInputFormat.getInputPaths(job); +if (tableNames == null || tableNames.length > 1) { + throw new IOException("expecting one table name"); +} + +// expecting at least one column +String colArg = job.get(COLUMN_LIST); +if (colArg == null || colArg.length() == 0) { + throw new IOException("expecting at least one column"); +} +} +} Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java?rev=680652&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java Tue Jul 29 02:33:18 2008 @@ -0,0 +1,275 @@ +package org.apache.hama.mapred; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.filter.RowFilterSet; +import org.apache.hadoop.hbase.filter.StopRowFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.mapred.TableSplit; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.io.VectorDatum; + +public abstract class MatrixInputFormatBase +implements InputFormat { + private final Log LOG = LogFactory.getLog(MatrixInputFormatBase.class); + private byte [][] inputColumns; + private HTable table; + private TableRecordReader tableRecordReader; + private RowFilterInterface rowFilter; + + /** + * Iterate over an HBase table data, return (Text, VectorResult) pairs + */ + protected class TableRecordReader + implements RecordReader { + private byte [] startRow; + private byte [] endRow; + private RowFilterInterface trrRowFilter; + private Scanner scanner; + private HTable htable; + private byte [][] trrInputColumns; + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + */ + public void init() throws IOException { + if ((endRow != null) && (endRow.length > 0)) { + if (trrRowFilter != null) { + final Set rowFiltersSet = + new HashSet(); + rowFiltersSet.add(new StopRowFilter(endRow)); + rowFiltersSet.add(trrRowFilter); + this.scanner = this.htable.getScanner(trrInputColumns, startRow, + new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL, + rowFiltersSet)); + } else { + this.scanner = + this.htable.getScanner(trrInputColumns, startRow, endRow); + } + } else { + this.scanner = + this.htable.getScanner(trrInputColumns, startRow, trrRowFilter); + } + } + + /** + * @param htable the {@link HTable} to scan. + */ + public void setHTable(HTable htable) { + this.htable = htable; + } + + /** + * @param inputColumns the columns to be placed in {@link VectorDatum}. + */ + public void setInputColumns(final byte [][] inputColumns) { + this.trrInputColumns = inputColumns; + } + + /** + * @param startRow the first row in the split + */ + public void setStartRow(final byte [] startRow) { + this.startRow = startRow; + } + + /** + * + * @param endRow the last row in the split + */ + public void setEndRow(final byte [] endRow) { + this.endRow = endRow; + } + + /** + * @param rowFilter the {@link RowFilterInterface} to be used. + */ + public void setRowFilter(RowFilterInterface rowFilter) { + this.trrRowFilter = rowFilter; + } + + /** {@inheritDoc} */ + public void close() throws IOException { + this.scanner.close(); + } + + /** + * @return ImmutableBytesWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + /** + * @return VectorResult + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + public VectorDatum createValue() { + return new VectorDatum(); + } + + /** {@inheritDoc} */ + public long getPos() { + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return 0; + } + + /** {@inheritDoc} */ + public float getProgress() { + // Depends on the total number of tuples and getPos + return 0; + } + + /** + * @param key HStoreKey as input key. + * @param value MapWritable as input value + * + * Converts Scanner.next() to Text, VectorResult + * + * @return true if there was more data + * @throws IOException + */ + @SuppressWarnings("unchecked") + public boolean next(ImmutableBytesWritable key, VectorDatum value) + throws IOException { + RowResult result = this.scanner.next(); + boolean hasMore = result != null && result.size() > 0; + if (hasMore) { + key.set(result.getRow()); + Writables.copyWritable(result, value); + } + return hasMore; + } + } + + /** + * Builds a TableRecordReader. If no TableRecordReader was provided, uses + * the default. + * + * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, + * JobConf, Reporter) + */ + public RecordReader getRecordReader(InputSplit split, + @SuppressWarnings("unused") + JobConf job, @SuppressWarnings("unused") + Reporter reporter) + throws IOException { + TableSplit tSplit = (TableSplit) split; + TableRecordReader trr = this.tableRecordReader; + // if no table record reader was provided use default + if (trr == null) { + trr = new TableRecordReader(); + } + trr.setStartRow(tSplit.getStartRow()); + trr.setEndRow(tSplit.getEndRow()); + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + trr.init(); + return trr; + } + + /** + * Calculates the splits that will serve as input for the map tasks. + *
    + * Splits are created in number equal to the smallest between numSplits and + * the number of {@link HRegion}s in the table. If the number of splits is + * smaller than the number of {@link HRegion}s then splits are spanned across + * multiple {@link HRegion}s and are grouped the most evenly possible. In the + * case splits are uneven the bigger splits are placed first in the + * {@link InputSplit} array. + * + * @param job the map task {@link JobConf} + * @param numSplits a hint to calculate the number of splits + * + * @return the input splits + * + * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int) + */ + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + byte [][] startKeys = this.table.getStartKeys(); + if (startKeys == null || startKeys.length == 0) { + throw new IOException("Expecting at least one region"); + } + if (this.table == null) { + throw new IOException("No table was provided"); + } + if (this.inputColumns == null || this.inputColumns.length == 0) { + throw new IOException("Expecting at least one column"); + } + int realNumSplits = numSplits > startKeys.length ? startKeys.length + : numSplits; + InputSplit[] splits = new InputSplit[realNumSplits]; + int middle = startKeys.length / realNumSplits; + int startPos = 0; + for (int i = 0; i < realNumSplits; i++) { + int lastPos = startPos + middle; + lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; + splits[i] = new TableSplit(this.table.getTableName(), + startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] + : HConstants.EMPTY_START_ROW); + if (LOG.isDebugEnabled()) { + LOG.debug("split: " + i + "->" + splits[i]); + } + startPos = lastPos; + } + return splits; + + } + + /** + * @param inputColumns to be passed in {@link VectorDatum} to the map task. + */ + protected void setInputColums(byte [][] inputColumns) { + this.inputColumns = inputColumns; + } + + /** + * Allows subclasses to set the {@link HTable}. + * + * @param table to get the data from + */ + protected void setHTable(HTable table) { + this.table = table; + } + + /** + * Allows subclasses to set the {@link TableRecordReader}. + * + * @param tableRecordReader + * to provide other {@link TableRecordReader} implementations. + */ + protected void setTableRecordReader(TableRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } + + /** + * Allows subclasses to set the {@link RowFilterInterface} to be used. + * + * @param rowFilter + */ + protected void setRowFilter(RowFilterInterface rowFilter) { + this.rowFilter = rowFilter; + } +} \ No newline at end of file Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java?rev=680652&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java Tue Jul 29 02:33:18 2008 @@ -0,0 +1,54 @@ +package org.apache.hama.mapred; + +import java.io.IOException; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableInputFormat; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hama.io.VectorDatum; + +@SuppressWarnings("unchecked") +public abstract class MatrixMap + extends MapReduceBase implements Mapper { + /** + * Use this before submitting a TableMap job. It will + * appropriately set up the JobConf. + * + * @param table table name + * @param columns columns to scan + * @param mapper mapper class + * @param job job configuration + */ + public static void initJob(String table, String columns, + Class mapper, + Class outputKeyClass, + Class outputValueClass, JobConf job) { + + job.setInputFormat(MatrixInputFormat.class); + job.setMapOutputValueClass(outputValueClass); + job.setMapOutputKeyClass(outputKeyClass); + job.setMapperClass(mapper); + FileInputFormat.addInputPaths(job, table); + job.set(TableInputFormat.COLUMN_LIST, columns); + } + + /** + * Call a user defined function on a single HBase record, represented + * by a key and its associated record value. + * + * @param key + * @param value + * @param output + * @param reporter + * @throws IOException + */ + public abstract void map(ImmutableBytesWritable key, VectorDatum value, + OutputCollector output, Reporter reporter) throws IOException; +} Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java?rev=680652&view=auto ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java (added) +++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java Tue Jul 29 02:33:18 2008 @@ -0,0 +1,48 @@ +package org.apache.hama.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +@SuppressWarnings("unchecked") +public abstract class MatrixReduce + extends MapReduceBase implements Reducer { + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table + * @param reducer + * @param job + */ + public static void initJob(String table, + Class reducer, JobConf job) { + job.setOutputFormat(TableOutputFormat.class); + job.setReducerClass(reducer); + job.set(TableOutputFormat.OUTPUT_TABLE, table); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(BatchUpdate.class); + } + + /** + * + * @param key + * @param values + * @param output + * @param reporter + * @throws IOException + */ + public abstract void reduce(K key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException; +} \ No newline at end of file Added: incubator/hama/trunk/src/test/org/apache/hama/TestVector.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestVector.java?rev=680652&view=auto ============================================================================== --- incubator/hama/trunk/src/test/org/apache/hama/TestVector.java (added) +++ incubator/hama/trunk/src/test/org/apache/hama/TestVector.java Tue Jul 29 02:33:18 2008 @@ -0,0 +1,57 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama; + +import org.apache.hadoop.io.Text; + +public class TestVector extends HamaTestCase { + + /** + * Test cosine similarity + */ + public void testCosine() { + final double result = 0.6978227007909176; + Matrix m1 = new Matrix(conf, new Text("cosine")); + + // TODO : We need setArray(int row, double[] value) to matrix + // e.g. matrixA.setArray(0, new double[] {2,5,1,4}); + // -- Edward + + m1.set(0, 0, 2); + m1.set(0, 1, 5); + m1.set(0, 2, 1); + m1.set(0, 3, 4); + + m1.set(1, 0, 4); + m1.set(1, 1, 1); + m1.set(1, 2, 3); + m1.set(1, 3, 3); + + LOG.info("get test : " + m1.get(0, 0)); + LOG.info("get test : " + m1.get(0, 1)); + + Vector v1 = new Vector(m1.getRowResult(0)); + Vector v2 = new Vector(m1.getRowResult(1)); + + double cos = v1.getCosine(v2); + assertEquals(cos, result); + m1.close(); + } +} Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=680652&r1=680651&r2=680652&view=diff ============================================================================== --- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java (original) +++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Tue Jul 29 02:33:18 2008 @@ -27,17 +27,15 @@ import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.RowResult; -import org.apache.hadoop.hbase.mapred.TableMap; -import org.apache.hadoop.hbase.mapred.TableReduce; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; -import org.apache.hama.FeatureVector; +import org.apache.hama.Vector; import org.apache.hama.HamaTestCase; import org.apache.hama.Matrix; +import org.apache.hama.io.VectorDatum; import org.apache.log4j.Logger; /** @@ -45,8 +43,6 @@ */ public class TestMatrixMapReduce extends HamaTestCase { static final Logger LOG = Logger.getLogger(TestMatrixMapReduce.class); - protected Matrix a; - protected Matrix b; /** constructor */ public TestMatrixMapReduce() { @@ -54,39 +50,35 @@ } public static class AdditionMap extends - TableMap { - protected Matrix matrix_b; + MatrixMap { + protected Matrix B; public static final String MATRIX_B = "hama.addition.substraction.matrix.b"; public void configure(JobConf job) { - matrix_b = new Matrix(new HBaseConfiguration(), new Text("MatrixB")); + B = new Matrix(new HBaseConfiguration(), new Text("MatrixB")); } @Override - public void map(ImmutableBytesWritable key, RowResult value, - OutputCollector output, + public void map(ImmutableBytesWritable key, VectorDatum value, + OutputCollector output, Reporter reporter) throws IOException { - FeatureVector v1 = new FeatureVector(matrix_b.getRowResult(key.get())); - FeatureVector v2 = new FeatureVector(value); - FeatureVector v3 = v1.addition(v2); - output.collect(key, v3.getRowResult(key.get())); - - LOG.info("xxx" + v3.getValueAt(0)); - LOG.info("xxx" + v3.getValueAt(1)); + Vector v1 = new Vector(B.getRowResult(key.get())); + Vector v2 = value.getVector(); + output.collect(key, v1.addition(key.get(), v2)); } } public static class AdditionReduce extends - TableReduce { + MatrixReduce { @Override - public void reduce(ImmutableBytesWritable key, Iterator values, + public void reduce(ImmutableBytesWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { BatchUpdate b = new BatchUpdate(key.get()); - RowResult r = values.next(); + VectorDatum r = values.next(); for (Map.Entry f : r.entrySet()) { b.put(f.getKey(), f.getValue().getValue()); } @@ -96,16 +88,14 @@ } public void testMatrixMapReduce() throws IOException { - a = new Matrix(conf, new Text("MatrixA")); + Matrix a = new Matrix(conf, new Text("MatrixA")); a.set(0, 0, 1); a.set(0, 1, 0); - b = new Matrix(conf, new Text("MatrixB")); + Matrix b = new Matrix(conf, new Text("MatrixB")); b.set(0, 0, 1); b.set(0, 1, 1); - a.close(); b.close(); - miniMRJob(); } @@ -116,9 +106,9 @@ JobConf jobConf = new JobConf(conf, TestMatrixMapReduce.class); jobConf.setJobName("test MR job"); - TableMap.initJob("MatrixA", "column:", AdditionMap.class, - ImmutableBytesWritable.class, RowResult.class, jobConf); - TableReduce.initJob("xanadu", AdditionReduce.class, jobConf); + MatrixMap.initJob("MatrixA", "column:", AdditionMap.class, + ImmutableBytesWritable.class, VectorDatum.class, jobConf); + MatrixReduce.initJob("xanadu", AdditionReduce.class, jobConf); jobConf.setNumMapTasks(1); jobConf.setNumReduceTasks(1);