hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomm...@apache.org
Subject svn commit: r1558747 [1/2] - in /hama/trunk: commons/src/main/java/org/apache/hama/commons/io/ commons/src/main/java/org/apache/hama/commons/math/ ml/src/main/java/org/apache/hama/ml/recommendation/ ml/src/main/java/org/apache/hama/ml/recommendation/cf...
Date Thu, 16 Jan 2014 10:41:18 GMT
Author: tommaso
Date: Thu Jan 16 10:41:17 2014
New Revision: 1558747

URL: http://svn.apache.org/r1558747
Log:
HAMA-612 - adapted Ikhtiyor Ahmedov patch to latest trunk, thanks Ikhtiyor

Added:
    hama/trunk/commons/src/main/java/org/apache/hama/commons/math/SquareVectorFunction.java   (with props)
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/ItemSimilarity.java   (with props)
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/Preference.java   (with props)
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/Recommender.java   (with props)
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/RecommenderIO.java   (with props)
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/UserSimilarity.java   (with props)
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/InputConverter.java   (with props)
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/KeyValueParser.java   (with props)
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/MovieLensConverter.java   (with props)
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/OnlineCF.java   (with props)
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/OnlineTrainBSP.java   (with props)
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/MeanAbsError.java   (with props)
    hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/OnlineUpdate.java   (with props)
    hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/
    hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java   (with props)
Modified:
    hama/trunk/commons/src/main/java/org/apache/hama/commons/io/MatrixWritable.java

Modified: hama/trunk/commons/src/main/java/org/apache/hama/commons/io/MatrixWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/io/MatrixWritable.java?rev=1558747&r1=1558746&r2=1558747&view=diff
==============================================================================
--- hama/trunk/commons/src/main/java/org/apache/hama/commons/io/MatrixWritable.java (original)
+++ hama/trunk/commons/src/main/java/org/apache/hama/commons/io/MatrixWritable.java Thu Jan 16 10:41:17 2014
@@ -69,5 +69,9 @@ public final class MatrixWritable implem
     }
     return mat;
   }
+  
+  public DoubleMatrix getMatrix() {
+    return this.mat;
+  }
 
 }

Added: hama/trunk/commons/src/main/java/org/apache/hama/commons/math/SquareVectorFunction.java
URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/math/SquareVectorFunction.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/commons/src/main/java/org/apache/hama/commons/math/SquareVectorFunction.java (added)
+++ hama/trunk/commons/src/main/java/org/apache/hama/commons/math/SquareVectorFunction.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.commons.math;
+
+@SuppressWarnings("deprecation")
+public class SquareVectorFunction implements DoubleVectorFunction {
+
+  @Override
+  public double calculate(int index, double value) {
+    return Math.pow(value, 2);
+  }
+
+}

Propchange: hama/trunk/commons/src/main/java/org/apache/hama/commons/math/SquareVectorFunction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/ItemSimilarity.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/ItemSimilarity.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/ItemSimilarity.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/ItemSimilarity.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.recommendation;
+
+import java.util.List;
+
+import org.apache.commons.math3.util.Pair;
+
+public interface ItemSimilarity {
+  /**
+   * calculate similarity between two items
+   * @param item1 - first item
+   * @param item2 - second item
+   * @return item similarity, 0 == similar item
+   */
+  double calculateItemSimilarity(long item1, long item2);
+
+  /**
+   * get most similar users
+   * @param item - item id
+   * @param count - number of similar items
+   * @return list of similar item ids(key) and similarity(value)
+   */
+  List<Pair<Long, Double>> getMostSimilarItems(long item, int count);
+}

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/ItemSimilarity.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/Preference.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/Preference.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/Preference.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/Preference.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.recommendation;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+
+/**
+ * User preference for given item
+ * @param <U> - user id type, usually long
+ * @param <I> - item id type, usually long
+ */
+public class Preference<U, I> {
+  protected U userId;
+  protected I itemId;
+  protected DoubleWritable value = null;
+
+  public Preference(U userId, I itemId, double value) {
+    this.userId = userId;
+    this.itemId = itemId;
+    this.value = new DoubleWritable(value);
+  }
+  public U getUserId() {
+    return userId;
+  }
+  
+  public void setUserId(U userId) {
+    this.userId = userId;
+  }
+  
+  public I getItemId() {
+    return itemId;
+  }
+  
+  public void setItemId(I itemId) {
+    this.itemId = itemId;
+  }
+  
+  public DoubleWritable getValue() {
+    return value;
+  }
+  
+  public void setValue(double value) {
+    this.value = new DoubleWritable(value);
+  }
+}

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/Preference.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/Recommender.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/Recommender.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/Recommender.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/Recommender.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.recommendation;
+
+import java.util.List;
+
+/**
+ * Recommendation engine that is used to train with BSP. 
+ * and predict user preferences.
+ */
+public interface Recommender {
+  
+  /**
+   * train model,
+   * default behavior is saving model after train
+   * @return true if success
+   */
+  boolean train();
+
+  /**
+   * save model
+   * @return true if success
+   */
+  boolean save();
+  
+  /**
+   * load model from given path
+   * @param path - path of saved model
+   * @param lazy - some models are bigger than available memory,
+   *               set this to {@value true} if memory is less than data model
+   *               for faster prediction, set to {@value false} if using
+   *               file access and reading from file is fine in prediction phase
+   * @return true if success
+   */
+  boolean load(String path, boolean lazy);
+
+  /**
+   * 
+   * @param userId
+   * @param itemId
+   * @return estimated preference value
+   */
+  double estimatePreference(long userId, long itemId);
+
+  /**
+   * get most preferred items for user
+   * @param userId
+   * @param count
+   * @return most preferred items with their values
+   */
+  List<Preference<Long, Long>> getMostPreferredItems(long userId, int count); 
+}
+

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/Recommender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/RecommenderIO.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/RecommenderIO.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/RecommenderIO.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/RecommenderIO.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.recommendation;
+
+public interface RecommenderIO {
+  /**
+   * set preferences input data
+   * @param path - file path to preferences data
+   * 
+   */
+  void setInputPreferences(String path);
+  
+  /**
+   * set user features input data
+   * @param path - file path to user features data if any
+   * 
+   */
+  void setInputUserFeatures(String path);
+  
+  /**
+   * set item features input data
+   * @param path - file path to item features data if any
+   * 
+   */
+  void setInputItemFeatures(String path);
+
+  /**
+   * set output path for trained model
+   * @param path - output path
+   * 
+   */
+  void setOutputPath(String path);
+
+}

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/RecommenderIO.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/UserSimilarity.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/UserSimilarity.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/UserSimilarity.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/UserSimilarity.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.recommendation;
+
+import java.util.List;
+
+import org.apache.commons.math3.util.Pair;
+
+public interface UserSimilarity {
+  /**
+   * calculate similarity between two users
+   * @param user1 - first user
+   * @param user2 - second user
+   * @return user similarity, 0 == similar user
+   */
+  double calculateUserSimilarity(long user1, long user2);
+
+  /**
+   * get most similar users
+   * @param user - user id
+   * @param count - number of similar users
+   * @return list of similar user ids(key) and similarity(value)
+   */
+  List<Pair<Long, Double>> getMostSimilarUsers(long user, int count);
+}

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/UserSimilarity.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/InputConverter.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/InputConverter.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/InputConverter.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/InputConverter.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.recommendation.cf;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.commons.io.VectorWritable;
+
+/**
+ * 
+ * Helper class to convert input data
+ * into OnlineCF compatible format
+ * set inputs and parse line by line.
+ *
+ */
+public class InputConverter {
+  private String inputPreferences = null;
+  private String inputUserFeatures = null;
+  private String inputItemFeatures = null;
+  private KeyValueParser<Text, VectorWritable> preferencesParser = null;
+  private KeyValueParser<Text, VectorWritable> userFeatureParser = null;
+  private KeyValueParser<Text, VectorWritable> itemFeatureParser = null;
+  
+  /**
+   * set preferences input and related parser line by line
+   * @param path - path for preferences input data
+   * @param parser - line by line parser of input data
+   */
+  public void setInputPreferences(String path, KeyValueParser<Text, VectorWritable> parser) {
+    inputPreferences = path;
+    preferencesParser = parser;
+  }
+  
+  /**
+   * set user features input and related parser line by line
+   * @param path - path for user features input data
+   * @param parser - line by line parser of input data
+   */
+  public void setInputUserFeatures(String path, KeyValueParser<Text, VectorWritable> parser) {
+    inputUserFeatures = path;
+    userFeatureParser = parser;
+  }
+  
+  /**
+   * set item features input and related parser line by line
+   * @param path - path for item features input data
+   * @param parser - line by line parser of input data
+   */
+  public void setInputItemFeatures(String path, KeyValueParser<Text, VectorWritable> parser) {
+    inputItemFeatures = path;
+    itemFeatureParser = parser;
+  }
+
+  
+  /**
+   * converting given inputs into compatible output and save
+   * @param outputPath - output path of converted input
+   * @return true if success
+   */
+  public boolean convert(String outputPath) {
+    try {
+      Configuration conf = new Configuration();
+      //URI outputUri = new URI(outputPath);
+      Path outputDataPath = new Path(outputPath);
+      FileSystem fs = FileSystem.get(conf);
+      fs.delete(outputDataPath, true);
+      fs.createNewFile(outputDataPath);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf,
+          outputDataPath, Text.class, VectorWritable.class);
+
+      // inputPreferences
+      writeToFile(inputPreferences, OnlineCF.Settings.DFLT_PREFERENCE_DELIM, 
+          preferencesParser, fs, writer);
+      
+      // user features
+      writeToFile(inputUserFeatures, OnlineCF.Settings.DFLT_USER_DELIM, 
+          userFeatureParser, fs, writer);
+
+      // item features
+      writeToFile(inputItemFeatures, OnlineCF.Settings.DFLT_ITEM_DELIM, 
+          itemFeatureParser, fs, writer);
+
+      writer.close();
+      return true;
+
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return false;
+  }
+
+  @SuppressWarnings("deprecation")
+  /**
+   * convert line by line and save
+   * @param inputPath - path of file to read
+   * @param defaultDelim - delimiter when writing to file as (delim_key, value)
+   * @param parser - line by line parser of input data
+   * @param fs - file system object
+   * @param writer - output file writer
+   * @throws IOException
+   */
+  private void writeToFile(
+      String inputPath,
+      String defaultDelim,
+      KeyValueParser<Text, VectorWritable> parser,
+      FileSystem fs,
+      SequenceFile.Writer writer) throws IOException {
+    if (inputPath == null || parser == null) {
+      return ;
+    }
+    Path path = new Path(inputPath);
+    FSDataInputStream preferencesIn = fs.open(path);
+    String line = null;
+    while ((line = preferencesIn.readLine()) != null) {
+      parser.parseLine(line);
+      writer.append(new Text(defaultDelim + parser.getKey().toString()), 
+          parser.getValue());
+    }
+  }
+}

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/InputConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/KeyValueParser.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/KeyValueParser.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/KeyValueParser.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/KeyValueParser.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.recommendation.cf;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Take line and convert it to Key Value,
+ * class contains "K key" 
+ * and "V value" variables
+ *
+ * @param <K> - key
+ * @param <V> - value
+ */
+public abstract class KeyValueParser<K extends Writable, V extends Writable> {
+  protected K key = null;
+  protected V value = null;
+  public abstract void parseLine(String ln);
+
+  /**
+   * 
+   * @return key, if not parsed yet may return null
+   * or value from last parsed line
+   */
+  public K getKey() {
+    return key;
+  }
+  
+  /**
+   * 
+   * @return value, if not parsed yet may return null
+   * or value from last parsed line
+   */
+  public V getValue() {
+    return value;
+  }
+}
+
+  
\ No newline at end of file

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/KeyValueParser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/MovieLensConverter.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/MovieLensConverter.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/MovieLensConverter.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/MovieLensConverter.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.recommendation.cf;
+
+import java.util.HashMap;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleVector;
+
+public class MovieLensConverter {
+  public boolean convert(String inputPrefs, String inputItemFeatures, String outputPath) {
+
+    InputConverter conv = new InputConverter();
+    String preferencesPath = inputPrefs;
+    String itemFeaturesPath = inputItemFeatures;
+
+    conv.setInputPreferences(preferencesPath, 
+        new KeyValueParser<Text, VectorWritable>() {
+      public void parseLine(String ln) {
+        String[] split = ln.split("::");
+        key = new Text(split[0]);
+        value = new VectorWritable();
+        double[] values = new double[2];
+        values[0] = Double.valueOf(split[1]);
+        values[1] = Double.valueOf(split[2]);
+        DenseDoubleVector dd = new DenseDoubleVector(values);
+        value.set(dd);
+      }
+    });
+
+    final HashMap<String, Integer> genreIndexes = new HashMap<String, Integer>();
+    genreIndexes.put("action", 0);
+    genreIndexes.put("animation", 1);
+    genreIndexes.put("children", 2);
+    genreIndexes.put("comedy", 3);
+    genreIndexes.put("crime", 4);
+    genreIndexes.put("documentary", 5);
+    genreIndexes.put("drama", 6);
+    genreIndexes.put("fantasy", 7);
+    genreIndexes.put("film-noir", 8);
+    genreIndexes.put("horror", 9);
+    genreIndexes.put("musical", 10);
+    genreIndexes.put("mystery", 11);
+    genreIndexes.put("romance", 12);
+    genreIndexes.put("sci-fi", 13);
+    genreIndexes.put("thriller", 14);
+    genreIndexes.put("war", 15);
+    genreIndexes.put("western", 16);
+    genreIndexes.put("imax", 17);
+    genreIndexes.put("adventure", 18);
+
+    conv.setInputItemFeatures(itemFeaturesPath, 
+        new KeyValueParser<Text, VectorWritable>() {
+      public void parseLine(String ln) {
+        String[] split = ln.split("::");
+        key = new Text(split[0]);
+        String[] genres = split[2].toLowerCase().split("[|]");
+        value = new VectorWritable();
+        
+        DenseDoubleVector values = new DenseDoubleVector(genreIndexes.size());
+        for (int i=0; i<genres.length; i++) {
+          Integer id = genreIndexes.get(genres[i]);
+          if (id == null) {
+            continue;
+          }
+          values.set(id, 1);
+        }
+        value.set(values);
+      }
+    });
+    return conv.convert(outputPath);
+  }
+}

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/MovieLensConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/OnlineCF.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/OnlineCF.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/OnlineCF.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/OnlineCF.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,543 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.recommendation.cf;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.InputMismatchException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.hama.commons.math.SquareVectorFunction;
+import org.apache.hama.ml.recommendation.ItemSimilarity;
+import org.apache.hama.ml.recommendation.Preference;
+import org.apache.hama.ml.recommendation.Recommender;
+import org.apache.hama.ml.recommendation.RecommenderIO;
+import org.apache.hama.ml.recommendation.UserSimilarity;
+import org.apache.hama.ml.recommendation.cf.function.MeanAbsError;
+import org.apache.hama.ml.recommendation.cf.function.OnlineUpdate;
+import org.apache.hama.ml.recommendation.cf.function.OnlineUpdate.Function;
+import org.apache.hama.ml.recommendation.cf.function.OnlineUpdate.InputStructure;
+
+public class OnlineCF implements Recommender, RecommenderIO, UserSimilarity, ItemSimilarity{
+
+  public static class Settings {
+    
+    // configuration strings
+    // delimiters for input data
+    public static final String CONF_INPUT_PREFERENCES_DELIM = "ml.recommender.cf.input.delim.preferences";
+    public static final String CONF_INPUT_USER_DELIM = "ml.recommender.cf.input.delim.user.features";
+    public static final String CONF_INPUT_ITEM_DELIM = "ml.recommender.cf.input.delim.item.features";
+    // delimiters for output data (trained model)
+    public static final String CONF_MODEL_USER_DELIM = "ml.recommender.cf.model.delim.user";
+    public static final String CONF_MODEL_ITEM_DELIM = "ml.recommender.cf.model.delim.item";
+    public static final String CONF_MODEL_USER_FEATURE_DELIM = "ml.recommender.cf.model.delim.user.features";
+    public static final String CONF_MODEL_ITEM_FEATURE_DELIM = "ml.recommender.cf.model.delim.item.features";
+
+    public static final String CONF_ITERATION_COUNT = "ml.recommender.cf.iterations";
+    public static final String CONF_MATRIX_RANK = "ml.recommender.cf.rank";
+    public static final String CONF_TASK_COUNT = "ml.recommender.cf.task.count";
+    public static final String CONF_SKIP_COUNT = "ml.recommender.cf.skip.count";
+
+    public static final String CONF_ONLINE_UPDATE_FUNCTION = "ml.recommender.cf.func.ou";
+  
+    // Message types
+    public static final IntWritable MSG_INP_USER_FEATURES = new IntWritable(0);
+    public static final IntWritable MSG_INP_ITEM_FEATURES = new IntWritable(1);
+    public static final IntWritable MSG_ITEM_MATRIX = new IntWritable(2);
+    public static final IntWritable MSG_ITEM_FEATURE_MATRIX = new IntWritable(3);
+    public static final IntWritable MSG_USER_FEATURE_MATRIX = new IntWritable(4);
+    public static final IntWritable MSG_SENDER_ID = new IntWritable(5);
+    public static final IntWritable MSG_VALUE = new IntWritable(6);
+    
+    // TODO: currently we support only one input
+    //     if multiple inputs support will be added
+    //     change inputPath accordingly
+    public static final String CONF_INPUT_PATH = "ml.recommender.cf.input.path";
+    public static final String CONF_OUTPUT_PATH = "ml.recommender.cf.output.path";
+
+    // default values
+    public static final int DFLT_ITERATION_COUNT = 100;
+    public static final int DFLT_MATRIX_RANK = 10;
+    public static final int DFLT_SKIP_COUNT = 5;
+    
+    // used for delimiting input data and we assume they will be length of one
+    public static final String DFLT_PREFERENCE_DELIM = "p";
+    public static final String DFLT_USER_DELIM = "u";
+    public static final String DFLT_ITEM_DELIM = "i";
+  
+    //used for delimiting output data (trained model)
+    public static final String DFLT_MODEL_USER_DELIM = "a";
+    public static final String DFLT_MODEL_ITEM_DELIM = "b";
+    //since user feature models are matrices,
+    //value is in form of (matrix_rank, matrix_converted_to_vector)
+    //and they don't have id of key, in order to avoid crash
+    //while parsing values, put some unnecessary value
+    public static final String DFLT_MODEL_USER_MTX_FEATURES_DELIM = "c";
+    public static final String DFLT_MODEL_ITEM_MTX_FEATURES_DELIM = "d";
+    
+    public static final String DFLT_MODEL_USER_FEATURES_DELIM = "e";
+    public static final String DFLT_MODEL_ITEM_FEATURES_DELIM = "f";
+  
+    public static final Class<? extends OnlineUpdate.Function> DFLT_UPDATE_FUNCTION = MeanAbsError.class;
+
+  } // Settings
+  
+  protected static Log LOG = LogFactory.getLog(OnlineCF.class);
+  HamaConfiguration conf = new HamaConfiguration();
+  // used only if model is loaded in memory
+  private HashMap<Long, VectorWritable> modelUserFactorizedValues = new HashMap<Long, VectorWritable>();
+  private HashMap<Long, VectorWritable> modelItemFactorizedValues = new HashMap<Long, VectorWritable>();
+  private HashMap<Long, VectorWritable> modelUserFeatures = new HashMap<Long, VectorWritable>();
+  private HashMap<Long, VectorWritable> modelItemFeatures = new HashMap<Long, VectorWritable>();
+  private DoubleMatrix modelUserFeatureFactorizedValues = null;
+  private DoubleMatrix modelItemFeatureFactorizedValues = null;
+  private String modelPath = null;
+  private boolean isLazyLoadModel = false;
+  private Function function = null;
+  
+  /**
+   * iteration count for matrix factorization
+   * @param count - iteration count
+   */
+  public void setIteration(int count) {
+    conf.setInt(OnlineCF.Settings.CONF_ITERATION_COUNT, count);
+  }
+  
+  /**
+   * Setting matrix rank for factorization
+   * @param rank - matrix rank
+   */
+  public void setMatrixRank(int rank) {
+    conf.setInt(OnlineCF.Settings.CONF_MATRIX_RANK, rank);
+  }
+  
+  /**
+   * Setting task count
+   * @param count - task count
+   */
+  public void setTaskCount(int count) {
+    conf.setInt(OnlineCF.Settings.CONF_TASK_COUNT, count);
+  }
+  
+  /**
+   * Online CF needs normalization of values
+   * this configuration is set after how many iteration
+   * of calculation values should be normalized between
+   * different items
+   * @param count - skip count before doing convergence
+   */
+  public void setSkipCount(int count) {
+    conf.setInt(OnlineCF.Settings.CONF_SKIP_COUNT, count);
+  }
+  
+  @Override
+  public void setInputPreferences(String path) {
+    LOG.debug("path = " + path);
+    String alreadySetPath = conf.get(OnlineCF.Settings.CONF_INPUT_PATH, null);
+    if (alreadySetPath != null && !alreadySetPath.equals(path)) {
+      throw new InputMismatchException("different input path given" 
+                                    + ", old: " + alreadySetPath
+                                    + ", current:" + path);
+    }
+    conf.set(OnlineCF.Settings.CONF_INPUT_PATH, path);
+  }
+
+  @Override
+  public void setInputUserFeatures(String path) {
+    LOG.debug("path = " + path);
+    String alreadySetPath = conf.get(OnlineCF.Settings.CONF_INPUT_PATH, null);
+    if (alreadySetPath != null && !alreadySetPath.equals(path)) {
+      throw new InputMismatchException("different input path given" 
+                                    + ", old: " + alreadySetPath
+                                    + ", current:" + path);
+    }
+    conf.set(OnlineCF.Settings.CONF_INPUT_PATH, path);
+  }
+
+  @Override
+  public void setInputItemFeatures(String path) {
+    LOG.debug("path = " + path);
+    String alreadySetPath = conf.get(OnlineCF.Settings.CONF_INPUT_PATH, null);
+    if (alreadySetPath != null && !alreadySetPath.equals(path)) {
+      throw new InputMismatchException("different input path given" 
+                                    + ", old: " + alreadySetPath
+                                    + ", current:" + path);
+    }
+    conf.set(OnlineCF.Settings.CONF_INPUT_PATH, path);    
+  }
+
+  @Override
+  public void setOutputPath(String path) {
+    conf.set(OnlineCF.Settings.CONF_OUTPUT_PATH, path);
+  }
+
+  /**
+   * Set update function to be used in compute phase
+   * of online cf train bsp
+   * @param cls
+   */
+  public void setUpdateFunction(Class<? extends OnlineUpdate.Function> cls) {
+    conf.setClass(OnlineCF.Settings.CONF_ONLINE_UPDATE_FUNCTION, cls, OnlineUpdate.Function.class);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean train() {
+    try {
+      BSPJob job = setupJob();
+      boolean res = job.waitForCompletion(true);
+      return res;
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
+    return false;
+  }
+
+  private BSPJob setupJob() throws IOException {
+    BSPJob job = new BSPJob(conf, OnlineCF.class);
+
+    String input = conf.get(OnlineCF.Settings.CONF_INPUT_PATH, null);
+    String output = conf.get(OnlineCF.Settings.CONF_OUTPUT_PATH, null);
+    Path in = new Path(input);
+    Path out = new Path(output);
+
+    if (conf.getInt(OnlineCF.Settings.CONF_MATRIX_RANK, -1) == -1) {
+      conf.setInt(OnlineCF.Settings.CONF_MATRIX_RANK, OnlineCF.Settings.DFLT_MATRIX_RANK);
+    }
+    
+    if (conf.getInt(OnlineCF.Settings.CONF_ITERATION_COUNT, -1) == -1) {
+      conf.setInt(OnlineCF.Settings.CONF_ITERATION_COUNT, OnlineCF.Settings.DFLT_ITERATION_COUNT);
+    }
+
+    if (conf.getInt(OnlineCF.Settings.CONF_SKIP_COUNT, -1) == -1) {
+      conf.setInt(OnlineCF.Settings.CONF_SKIP_COUNT, OnlineCF.Settings.DFLT_SKIP_COUNT);
+    }
+    
+    if (conf.getClass(OnlineCF.Settings.CONF_ONLINE_UPDATE_FUNCTION, null) == null) {
+      conf.setClass(OnlineCF.Settings.CONF_ONLINE_UPDATE_FUNCTION, OnlineCF.Settings.DFLT_UPDATE_FUNCTION,
+              OnlineUpdate.Function.class);
+    }
+    conf.set(OnlineCF.Settings.CONF_MODEL_USER_DELIM, OnlineCF.Settings.DFLT_MODEL_USER_DELIM);
+    conf.set(OnlineCF.Settings.CONF_MODEL_USER_FEATURE_DELIM, OnlineCF.Settings.DFLT_MODEL_USER_MTX_FEATURES_DELIM);
+    conf.set(OnlineCF.Settings.CONF_MODEL_ITEM_DELIM, OnlineCF.Settings.DFLT_MODEL_ITEM_DELIM);
+    conf.set(OnlineCF.Settings.CONF_MODEL_ITEM_FEATURE_DELIM, OnlineCF.Settings.DFLT_MODEL_ITEM_MTX_FEATURES_DELIM);
+
+    job.setJobName("Online CF");
+    job.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
+    job.setPartitioner(HashPartitioner.class);
+    job.setBspClass(OnlineTrainBSP.class);
+
+    job.setInputPath(in);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(Text.class);
+    job.setInputValueClass(VectorWritable.class);
+
+    job.setOutputPath(out);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(VectorWritable.class);
+
+    job.setNumBspTask(conf.getInt(OnlineCF.Settings.CONF_TASK_COUNT, job.getNumBspTask()));
+    return job;
+  }
+
+  @Override
+  public boolean save() {
+    // default behaivor is saving after training, 
+    // we cannot hold model in memory after bsp
+    return true;
+  }
+
+  @Override
+  public boolean load(String path, boolean lazy) {
+    this.isLazyLoadModel = lazy;
+    this.modelPath = path;
+    if (lazy == false) {
+      Configuration conf = new Configuration();
+      Path dataPath = new Path(modelPath);
+
+      try {
+        FileSystem fs = dataPath.getFileSystem(conf);
+        LinkedList<Path> files = new LinkedList<Path>();
+
+        if (!fs.exists(dataPath)) {
+          this.isLazyLoadModel = false;
+          this.modelPath = null;
+          return false;
+        }
+        
+        if(!fs.isFile(dataPath)) {
+          for (int i=0; i<100000; i++) {
+            Path partFile = new Path(modelPath + "/part-" + String.valueOf(100000 + i).substring(1, 6));
+            if(fs.exists(partFile)) {
+              files.add(partFile);
+            } else {
+              break;
+            }
+          }
+        } else {
+          files.add(dataPath);
+        }
+
+        LOG.info("loading model from " + path);
+        for (Path file : files){
+          SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
+          Text key = new Text();
+          VectorWritable value = new VectorWritable();
+          String strKey = null;
+          Long actualKey = null;
+          String firstSymbol = null;
+          while (reader.next(key, value) != false) {
+            strKey = key.toString();
+            firstSymbol = strKey.substring(0, 1);
+            try {
+              actualKey = Long.valueOf(strKey.substring(1));
+            } catch (Exception e) {
+              actualKey = new Long(0);
+            }
+
+            if (firstSymbol.equals(OnlineCF.Settings.DFLT_MODEL_ITEM_DELIM)) {
+              modelItemFactorizedValues.put(actualKey, new VectorWritable(value));
+            } else if (firstSymbol.equals(OnlineCF.Settings.DFLT_MODEL_USER_DELIM)) {
+              modelUserFactorizedValues.put(actualKey, new VectorWritable(value));
+            } else if (firstSymbol.equals(OnlineCF.Settings.DFLT_MODEL_USER_FEATURES_DELIM)) {
+              modelUserFeatures.put(actualKey, new VectorWritable(value));
+            } else if (firstSymbol.equals(OnlineCF.Settings.DFLT_MODEL_ITEM_FEATURES_DELIM)) {
+              modelItemFeatures.put(actualKey, new VectorWritable(value));
+            } else if (firstSymbol.equals(OnlineCF.Settings.DFLT_MODEL_USER_MTX_FEATURES_DELIM)) {
+              modelUserFeatureFactorizedValues = convertVectorWritable(value);
+            } else if (firstSymbol.equals(OnlineCF.Settings.DFLT_MODEL_ITEM_MTX_FEATURES_DELIM)) {
+              modelItemFeatureFactorizedValues = convertVectorWritable(value);
+            } else {
+              // unknown
+              continue;
+            }
+          }
+        }
+        LOG.info("loaded: " + modelUserFactorizedValues.size() + " users, "
+                        + modelUserFeatures.size() + " user features, "
+                        + modelItemFactorizedValues.size() + " items, "
+                        + modelItemFeatures.size() + " item feature values");
+
+      } catch (Exception e) {
+        e.printStackTrace();
+        this.isLazyLoadModel = false;
+        this.modelPath = null;
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private DoubleMatrix convertVectorWritable(VectorWritable value) {
+    //format of array: matrix_rank, matrix_converted_to_vector
+    DoubleVector vc = value.getVector();
+    int matrix_rank = (int) vc.get(0);
+    int matrix_size = vc.getLength()-1;
+    LinkedList<DoubleVector> slices = new LinkedList<DoubleVector>();
+    int offset = 1;
+    while (offset < matrix_size) {
+      slices.add(vc.slice(offset, matrix_rank));
+      offset += matrix_rank;
+    }
+    DoubleMatrix res = new DenseDoubleMatrix((DoubleVector[])slices.toArray());
+    return res;
+  }
+
+  @Override
+  public double estimatePreference(long userId, long itemId) {
+    if (isLazyLoadModel == false) {
+      if (function == null) {
+        Class<?> cls = conf.getClass(OnlineCF.Settings.CONF_ONLINE_UPDATE_FUNCTION, null);
+        try {
+          function = (OnlineUpdate.Function)(cls.newInstance());
+        } catch (Exception e) {
+          // set default function
+        }
+      }
+
+      InputStructure e = new InputStructure();
+      e.item = this.modelItemFactorizedValues.get(Long.valueOf(itemId));
+      e.user = this.modelUserFactorizedValues.get(Long.valueOf(userId));
+      e.itemFeatureFactorized = this.modelItemFeatureFactorizedValues;
+      e.userFeatureFactorized = this.modelUserFeatureFactorizedValues;
+      e.itemFeatures = this.modelItemFeatures.get(Long.valueOf(itemId));
+      e.userFeatures = this.modelUserFeatures.get(Long.valueOf(userId));
+      if (e.item == null || e.user == null) {
+        return 0;
+      }
+
+      return function.predict(e);
+    }
+    return 0;
+    
+  }
+
+  @Override
+  public List<Preference<Long, Long>> getMostPreferredItems(long userId, int count) {
+    Comparator<Preference<Long, Long>> scoreComparator = new Comparator<Preference<Long, Long>>() {
+      
+      @Override
+      public int compare(Preference<Long, Long> arg0, Preference<Long, Long> arg1) {
+        double difference = arg0.getValue().get() - arg1.getValue().get(); 
+        return (int)(100000*difference);
+      }
+    };
+    PriorityQueue<Preference<Long, Long>> queue = new PriorityQueue<Preference<Long, Long>>(count, scoreComparator);
+    LinkedList<Preference<Long, Long>> results = new LinkedList<Preference<Long, Long>>();
+    
+    if (function == null) {
+      Class<?> cls = conf.getClass(OnlineCF.Settings.CONF_ONLINE_UPDATE_FUNCTION, null);
+      try {
+        function = (OnlineUpdate.Function)(cls.newInstance());
+      } catch (Exception e) {
+        // set default function
+      }
+    }
+    
+    InputStructure e = new InputStructure();
+    e.user = this.modelUserFactorizedValues.get(Long.valueOf(userId));
+    e.userFeatureFactorized = this.modelUserFeatureFactorizedValues;
+    e.userFeatures = this.modelUserFeatures.get(Long.valueOf(userId));
+    e.itemFeatureFactorized = this.modelItemFeatureFactorizedValues;
+    if (e.user == null) {
+      return null;
+    }
+    
+    double score = 0.0;
+    for (Entry<Long, VectorWritable> item : modelItemFactorizedValues.entrySet()) {
+      e.item = item.getValue();
+      e.itemFeatures = this.modelItemFeatures.get(item.getKey());
+      score = function.predict(e);
+      queue.add(new Preference<Long, Long>(userId, item.getKey(), score));
+    }
+    results.addAll(queue);
+    return results;
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public double calculateUserSimilarity(long user1, long user2) {
+    VectorWritable usr1 = this.modelUserFactorizedValues.get(Long.valueOf(user1));
+    VectorWritable usr2 = this.modelUserFactorizedValues.get(Long.valueOf(user2));
+    if (usr1 == null || usr2 == null) {
+      return Double.MAX_VALUE;
+    }
+    
+    DoubleVector usr1Vector = usr1.getVector();
+    DoubleVector usr2Vector = usr2.getVector();
+    
+    // Euclidean distance
+    return Math.pow( usr1Vector
+                    .subtract(usr2Vector)
+                    .apply(new SquareVectorFunction())
+                    .sum() , 0.5);
+  }
+
+  @Override
+  public List<Pair<Long, Double>> getMostSimilarUsers(long user, int count) {
+    
+    Comparator<Pair<Long, Double>> similarityComparator = new Comparator<Pair<Long, Double>>() {
+      
+      @Override
+      public int compare(Pair<Long, Double> arg0, Pair<Long, Double> arg1) {
+        double difference = arg0.getValue().doubleValue() - arg1.getValue().doubleValue(); 
+        return (int)(100000*difference);
+      }
+    };
+    PriorityQueue<Pair<Long, Double>> queue = new PriorityQueue<Pair<Long, Double>>(count, similarityComparator);
+    LinkedList<Pair<Long, Double>> results = new LinkedList<Pair<Long, Double>>();
+    for ( Long candidateUser : modelUserFactorizedValues.keySet() ) {
+      double similarity = calculateUserSimilarity(user, candidateUser);
+      Pair<Long, Double> targetUser = new Pair<Long, Double>(candidateUser, similarity);
+      queue.add(targetUser);
+    }
+    results.addAll(queue);
+    return results;
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public double calculateItemSimilarity(long item1, long item2) {
+    VectorWritable itm1 = this.modelUserFactorizedValues.get(Long.valueOf(item1));
+    VectorWritable itm2 = this.modelUserFactorizedValues.get(Long.valueOf(item2));
+    if (itm1 == null || itm2 == null) {
+      return Double.MAX_VALUE;
+    }
+    
+    DoubleVector itm1Vector = itm1.getVector();
+    DoubleVector itm2Vector = itm2.getVector();
+    
+    // Euclidean distance
+    return Math.pow( itm1Vector
+                      .subtract(itm2Vector)
+                      .apply(new SquareVectorFunction())
+                      .sum() , 0.5);
+  }
+
+  @Override
+  public List<Pair<Long, Double>> getMostSimilarItems(long item, int count) {
+    
+    Comparator<Pair<Long, Double>> similarityComparator = new Comparator<Pair<Long, Double>>() {
+      
+      @Override
+      public int compare(Pair<Long, Double> arg0, Pair<Long, Double> arg1) {
+        double difference = arg0.getValue().doubleValue() - arg1.getValue().doubleValue(); 
+        return (int)(100000*difference);
+      }
+    };
+    PriorityQueue<Pair<Long, Double>> queue = new PriorityQueue<Pair<Long, Double>>(count, similarityComparator);
+    LinkedList<Pair<Long, Double>> results = new LinkedList<Pair<Long, Double>>();
+    for ( Long candidateItem : modelItemFactorizedValues.keySet() ) {
+      double similarity = calculateItemSimilarity(item, candidateItem);
+      Pair<Long, Double> targetItem = new Pair<Long, Double>(candidateItem, similarity);
+      queue.add(targetItem);
+    }
+    results.addAll(queue);
+    return results;
+  }
+
+}

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/OnlineCF.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/OnlineTrainBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/OnlineTrainBSP.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/OnlineTrainBSP.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/OnlineTrainBSP.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,599 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.recommendation.cf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.commons.io.MatrixWritable;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.hama.ml.recommendation.Preference;
+import org.apache.hama.ml.recommendation.cf.function.OnlineUpdate;
+
+public class OnlineTrainBSP extends
+    BSP<Text, VectorWritable, Text, VectorWritable, MapWritable> {
+
+  protected static Log LOG = LogFactory.getLog(OnlineTrainBSP.class);
+  
+  private String inputPreferenceDelim = null;
+  private String inputUserDelim = null;
+  private String inputItemDelim = null;
+
+  private int ITERATION = 0;
+  private int MATRIX_RANK = 0;
+  private int SKIP_COUNT = 0;
+  
+  // randomly generated depending on matrix rank,
+  // will be computed runtime and represents trained model
+  // userId, factorized value
+  private HashMap<String, VectorWritable> usersMatrix = new HashMap<String, VectorWritable>();
+  // itemId, factorized value
+  private HashMap<String, VectorWritable> itemsMatrix = new HashMap<String, VectorWritable>();
+  // matrixRank, factorized value
+  private DoubleMatrix userFeatureMatrix = null;
+  private DoubleMatrix itemFeatureMatrix = null;
+
+  // obtained from input data
+  // will not change during execution
+  private HashMap<String, VectorWritable> inpUsersFeatures = null;
+  private HashMap<String, VectorWritable> inpItemsFeatures = null;
+  
+  private OnlineUpdate.Function function = null;
+  
+  // Input Preferences
+  private ArrayList<Preference<String, String>> preferences = new ArrayList<Preference<String, String>>();
+  private ArrayList<Integer> indexes = new ArrayList<Integer>();
+  
+  Random rnd = new Random();
+
+  @Override
+  public void setup(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer)
+      throws IOException, SyncException, InterruptedException {
+
+    Configuration conf = peer.getConfiguration();
+
+    ITERATION = conf.getInt(OnlineCF.Settings.CONF_ITERATION_COUNT, OnlineCF.Settings.DFLT_ITERATION_COUNT);
+    MATRIX_RANK = conf.getInt(OnlineCF.Settings.CONF_MATRIX_RANK, OnlineCF.Settings.DFLT_MATRIX_RANK);
+    SKIP_COUNT = conf.getInt(OnlineCF.Settings.CONF_SKIP_COUNT, OnlineCF.Settings.DFLT_SKIP_COUNT);
+
+    inputItemDelim = conf.get(OnlineCF.Settings.CONF_INPUT_ITEM_DELIM, OnlineCF.Settings.DFLT_ITEM_DELIM);
+    inputUserDelim = conf.get(OnlineCF.Settings.CONF_INPUT_USER_DELIM, OnlineCF.Settings.DFLT_USER_DELIM);
+    inputPreferenceDelim = conf.get(OnlineCF.Settings.CONF_INPUT_PREFERENCES_DELIM, OnlineCF.Settings.DFLT_PREFERENCE_DELIM);
+
+    Class<?> cls = conf.getClass(OnlineCF.Settings.CONF_ONLINE_UPDATE_FUNCTION, null);
+    try {
+      function = (OnlineUpdate.Function)(cls.newInstance());
+    } catch (Exception e) {
+      // set default function
+    }
+  }
+
+  @Override
+  public void bsp(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer)
+      throws IOException, SyncException, InterruptedException {
+    LOG.info(peer.getPeerName() + ") collecting input data");
+    // input partitioning begin,
+    // because we used one file for all input data
+    // and input data are different type
+    HashSet<Text> requiredUserFeatures = null;
+    HashSet<Text> requiredItemFeatures = null;
+    collectInput(peer, requiredUserFeatures, requiredItemFeatures);
+    // since we have used some delimiters for 
+    // keys, HashPartitioner cannot partition
+    // as we want, take user preferences and 
+    // broadcast user features and item features
+    askForFeatures(peer, requiredUserFeatures, requiredItemFeatures);
+    peer.sync();
+
+    requiredUserFeatures = null;
+    requiredItemFeatures = null;
+    sendRequiredFeatures(peer);
+    peer.sync();
+
+    collectFeatures(peer);
+    LOG.info(peer.getPeerName() + ") collected: " + this.usersMatrix.size() + " users, "
+                           + this.itemsMatrix.size() + " items, "
+                           + this.preferences.size() + " preferences");
+    // input partitioning end
+    
+    // calculation steps
+    for (int i=0; i<ITERATION; i++) {
+      computeValues();
+      if ((i+1)%SKIP_COUNT == 0) {
+        normalizeWithBroadcastingValues(peer);
+      }
+    }
+
+    saveModel(peer);
+  }
+
+  private void normalizeWithBroadcastingValues(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer) 
+          throws IOException, SyncException, InterruptedException {
+    // normalize item factorized values
+    // normalize user/item feature matrix
+    peer.sync();
+    normalizeItemFactorizedValues(peer);
+    peer.sync();
+
+    if (itemFeatureMatrix != null) {
+      // item feature factorized values should be normalized
+      normalizeMatrix(peer, itemFeatureMatrix, OnlineCF.Settings.MSG_ITEM_FEATURE_MATRIX, true);
+      peer.sync();
+    }
+
+    if (userFeatureMatrix != null) {
+      // user feature factorized values should be normalized
+      normalizeMatrix(peer, userFeatureMatrix, OnlineCF.Settings.MSG_USER_FEATURE_MATRIX, true);
+      peer.sync();
+    }
+  }
+
+  private DoubleMatrix normalizeMatrix(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer,
+      DoubleMatrix featureMatrix, IntWritable msgFeatureMatrix, boolean broadcast) 
+          throws IOException, SyncException, InterruptedException {
+    // send to master peer
+    MapWritable msg = new MapWritable();
+    MatrixWritable mtx = new MatrixWritable(featureMatrix);
+    msg.put(msgFeatureMatrix, mtx);
+    String master = peer.getPeerName(peer.getNumPeers()/2);
+    peer.send(master, msg);
+    peer.sync();
+
+    // normalize
+    DoubleMatrix res = null;
+    if (peer.getPeerName().equals(master)) {
+      res = new DenseDoubleMatrix(featureMatrix.getRowCount(),
+                                  featureMatrix.getColumnCount(), 0);
+      int incomingMsgCount = 0;
+      while ( (msg = peer.getCurrentMessage()) != null) {
+        MatrixWritable tmp = (MatrixWritable) msg.get(msgFeatureMatrix);
+        res.add(tmp.getMatrix());
+        incomingMsgCount++;
+      }
+      res.divide(incomingMsgCount);
+    }
+
+    if (broadcast) {
+      if (peer.getPeerName().equals(master)) {
+        // broadcast to all
+        msg = new MapWritable();
+        msg.put(msgFeatureMatrix, new MatrixWritable(res));
+        // send to all
+        for (String peerName : peer.getAllPeerNames()) {
+          peer.send(peerName, msg);
+        }
+      }
+      peer.sync();
+      // receive normalized value from master
+      msg = peer.getCurrentMessage();
+      featureMatrix = ((MatrixWritable)msg.get(msgFeatureMatrix)).getMatrix();
+    }
+    return res;
+  }
+
+  private VectorWritable convertMatrixToVector(DoubleMatrix mat) {
+    DoubleVector res = new DenseDoubleVector(mat.getRowCount()*mat.getColumnCount()+1);
+    int idx = 0;
+    res.set(idx, MATRIX_RANK);
+    idx++;
+    for (int i=0; i<mat.getRowCount(); i++) {
+      for (int j=0; j<mat.getColumnCount(); j++) {
+        res.set(idx, mat.get(i, j));
+        idx++;
+      }
+    }
+    return new VectorWritable(res);
+  }
+
+  private void normalizeItemFactorizedValues(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer) 
+          throws IOException, SyncException, InterruptedException {
+    // send item factorized matrices to selected peers
+    sendItemFactorizedValues(peer);
+    peer.sync();
+    
+    // receive item factorized matrices if this peer is selected and normalize them
+    HashMap<Text, LinkedList<IntWritable>> senderList = new HashMap<Text, LinkedList<IntWritable>>();
+    HashMap<Text, DoubleVector> normalizedValues = new HashMap<Text, DoubleVector>();
+    getNormalizedItemFactorizedValues(peer, normalizedValues, senderList);
+    
+    // send back normalized values to senders
+    sendTo(peer, senderList, normalizedValues);
+    peer.sync();
+    
+    // receive already normalized and synced data
+    receiveSyncedItemFactorizedValues(peer);
+  }
+
+  private void sendTo(BSPPeer<Text,VectorWritable,Text,VectorWritable,MapWritable> peer, 
+      HashMap<Text, LinkedList<IntWritable>> senderList,
+      HashMap<Text, DoubleVector> normalizedValues)
+          throws IOException {
+
+    for (Map.Entry<Text, DoubleVector> e : normalizedValues.entrySet()) {
+      MapWritable msgTmp = new MapWritable();
+      // send to interested peers
+      msgTmp.put(OnlineCF.Settings.MSG_ITEM_MATRIX, e.getKey());
+      msgTmp.put(OnlineCF.Settings.MSG_VALUE, new VectorWritable(e.getValue()));
+      Iterator<IntWritable> iter = senderList.get(e.getKey()).iterator(); 
+      while (iter.hasNext()) {
+        peer.send(peer.getPeerName(iter.next().get()), msgTmp);
+      }
+    }
+  }
+
+  private void getNormalizedItemFactorizedValues(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer,
+      HashMap<Text, DoubleVector> normalizedValues,
+      HashMap<Text, LinkedList<IntWritable>> senderList) 
+          throws IOException {
+
+    HashMap<Text, Integer> normalizedValueCount = new HashMap<Text, Integer>();
+    Text itemId = null;
+    VectorWritable value = null;
+    IntWritable senderId = null;
+    MapWritable msg = new MapWritable();
+
+    while ( (msg = peer.getCurrentMessage())!=null ) {
+      itemId = (Text) msg.get(OnlineCF.Settings.MSG_ITEM_MATRIX);
+      value = (VectorWritable) msg.get(OnlineCF.Settings.MSG_VALUE);
+      senderId = (IntWritable) msg.get(OnlineCF.Settings.MSG_SENDER_ID);
+
+      if (normalizedValues.containsKey(itemId) == false) {
+        DenseDoubleVector tmp = new DenseDoubleVector(MATRIX_RANK, 0.0);
+        normalizedValues.put(itemId, tmp);
+        normalizedValueCount.put(itemId, 0);
+        senderList.put(itemId, new LinkedList<IntWritable>());
+      }
+
+      normalizedValues.put(itemId, normalizedValues.get(itemId).add(value.getVector()));
+      normalizedValueCount.put(itemId, normalizedValueCount.get(itemId)+1);
+      senderList.get(itemId).add(senderId);
+    }
+
+    // normalize
+    for (Map.Entry<Text, DoubleVector> e : normalizedValues.entrySet()) {
+      double count = normalizedValueCount.get(e.getKey());
+      e.setValue(e.getValue().multiply(1.0/count));
+    }
+  }
+
+  private void receiveSyncedItemFactorizedValues(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer) 
+          throws IOException {
+    
+    MapWritable msg = new MapWritable();
+    Text itemId = null;
+    // messages are arriving take them
+    while ((msg = peer.getCurrentMessage()) != null) {
+      itemId = (Text) msg.get(OnlineCF.Settings.MSG_ITEM_MATRIX);
+      itemsMatrix.put(itemId.toString(), (VectorWritable)msg.get(OnlineCF.Settings.MSG_VALUE));
+    }
+  }
+
+  private void sendItemFactorizedValues(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer) 
+          throws IOException, SyncException, InterruptedException {
+    int peerCount = peer.getNumPeers();
+    // item factorized values should be normalized
+    IntWritable peerId = new IntWritable(peer.getPeerIndex());
+
+    for (Map.Entry<String, VectorWritable> item : itemsMatrix.entrySet()) {
+      MapWritable msg = new MapWritable();
+      msg.put(OnlineCF.Settings.MSG_ITEM_MATRIX, new Text(item.getKey()));
+      msg.put(OnlineCF.Settings.MSG_VALUE, item.getValue());
+      msg.put(OnlineCF.Settings.MSG_SENDER_ID, peerId);
+      peer.send( peer.getPeerName(item.getKey().hashCode()%peerCount), msg);
+    }
+  }
+
+  private void computeValues() {
+    // shuffling indexes
+    int idx = 0;
+    int idxValue = 0;
+    int tmp = 0;
+    for (int i=indexes.size(); i>0; i--) {
+      idx = Math.abs(rnd.nextInt())%i;
+      idxValue = indexes.get(idx);
+      tmp = indexes.get(i-1);
+      indexes.set(i-1, idxValue);
+      indexes.set(idx, tmp);
+    }
+    
+    // compute values
+    OnlineUpdate.InputStructure inp = new OnlineUpdate.InputStructure();
+    OnlineUpdate.OutputStructure out = null;
+    Preference<String, String> pref = null;
+    for (Integer prefIdx : indexes) {
+      pref = preferences.get(prefIdx);
+
+      VectorWritable userFactorizedValues = usersMatrix.get(pref.getUserId());
+      VectorWritable itemFactorizedValues = itemsMatrix.get(pref.getItemId());
+      VectorWritable userFeatures = (inpUsersFeatures!=null)?inpUsersFeatures.get(pref.getUserId()):null;
+      VectorWritable itemFeatures = (inpItemsFeatures!=null)?inpItemsFeatures.get(pref.getItemId()):null;
+
+      inp.user = userFactorizedValues;
+      inp.item = itemFactorizedValues;
+      inp.expectedScore = pref.getValue();
+      inp.userFeatures = userFeatures;
+      inp.itemFeatures = itemFeatures;
+      inp.userFeatureFactorized = userFeatureMatrix;
+      inp.itemFeatureFactorized = itemFeatureMatrix;
+
+      out = function.compute(inp);
+
+      usersMatrix.put(pref.getUserId(), out.userFactorized);
+      itemsMatrix.put(pref.getItemId(), out.itemFactorized);
+      userFeatureMatrix = out.userFeatureFactorized;
+      itemFeatureMatrix = out.itemFeatureFactorized;
+    }
+  }
+
+  private void saveModel(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer) 
+          throws IOException, SyncException, InterruptedException {
+
+    // save user information
+    LOG.info(peer.getPeerName() + ") saving " + usersMatrix.size() + " users");
+    for (Map.Entry<String, VectorWritable> user : usersMatrix.entrySet()) {
+      peer.write( new Text(OnlineCF.Settings.DFLT_MODEL_USER_DELIM + user.getKey()), user.getValue());
+    }
+
+    // broadcast item values, normalize and save
+    sendItemFactorizedValues(peer);
+    peer.sync();
+    
+    HashMap<Text, LinkedList<IntWritable>> senderList = new HashMap<Text, LinkedList<IntWritable>>();
+    HashMap<Text, DoubleVector> normalizedValues = new HashMap<Text, DoubleVector>();
+    getNormalizedItemFactorizedValues(peer, normalizedValues, senderList);
+
+    saveItemFactorizedValues(peer, normalizedValues);
+
+    // broadcast item and user feature matrix
+    // normalize and save
+    if (itemFeatureMatrix != null) {
+      // save item features
+      for (Map.Entry<String, VectorWritable> feature : inpItemsFeatures.entrySet()) {
+        peer.write(new Text(OnlineCF.Settings.DFLT_MODEL_ITEM_FEATURES_DELIM+feature.getKey()), feature.getValue());
+      }
+      // item feature factorized values should be normalized
+      DoubleMatrix res = normalizeMatrix(peer, itemFeatureMatrix, 
+          OnlineCF.Settings.MSG_ITEM_FEATURE_MATRIX, false);
+      
+      if (res != null) {
+        Text key = new Text(OnlineCF.Settings.DFLT_MODEL_ITEM_MTX_FEATURES_DELIM + 
+            OnlineCF.Settings.MSG_ITEM_FEATURE_MATRIX.toString());
+        peer.write(key, convertMatrixToVector(res));
+      }
+    }
+
+    if (userFeatureMatrix != null) {
+      // save user features
+   // save item features
+      for (Map.Entry<String, VectorWritable> feature : inpUsersFeatures.entrySet()) {
+        peer.write(new Text(OnlineCF.Settings.DFLT_MODEL_USER_FEATURES_DELIM+feature.getKey()), feature.getValue());
+      }
+      // user feature factorized values should be normalized
+      DoubleMatrix res = normalizeMatrix(peer, userFeatureMatrix, 
+          OnlineCF.Settings.MSG_USER_FEATURE_MATRIX, false);
+      
+      if (res != null) {
+        Text key = new Text(OnlineCF.Settings.DFLT_MODEL_USER_MTX_FEATURES_DELIM + 
+            OnlineCF.Settings.MSG_USER_FEATURE_MATRIX.toString());
+        peer.write(key, convertMatrixToVector(res));
+      }
+    }
+  }
+
+  private void saveItemFactorizedValues(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer,
+      HashMap<Text, DoubleVector> normalizedValues) 
+          throws IOException {
+    LOG.info(peer.getPeerName() + ") saving " + normalizedValues.size() + " items");
+    for (Map.Entry<Text, DoubleVector> item : normalizedValues.entrySet()) {
+      peer.write( new Text(OnlineCF.Settings.DFLT_MODEL_ITEM_DELIM + item.getKey().toString()), 
+          new VectorWritable(item.getValue()));
+    }
+  }
+
+  private void sendRequiredFeatures(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer) 
+      throws IOException, SyncException, InterruptedException {
+    
+    MapWritable msg = null;
+    int senderId = 0;
+
+    while ((msg = peer.getCurrentMessage()) != null) {
+      senderId = ((IntWritable)msg.get(OnlineCF.Settings.MSG_SENDER_ID)).get();
+      MapWritable resp = new MapWritable();
+      if (msg.containsKey(OnlineCF.Settings.MSG_INP_ITEM_FEATURES)) {
+        // send item feature
+        String itemId = ((Text)msg.get(OnlineCF.Settings.MSG_INP_ITEM_FEATURES)).toString().substring(1);
+        resp.put(OnlineCF.Settings.MSG_INP_ITEM_FEATURES, new Text(itemId));
+        resp.put(OnlineCF.Settings.MSG_VALUE, inpItemsFeatures.get(itemId));
+      } else if (msg.containsKey(OnlineCF.Settings.MSG_INP_USER_FEATURES)) {
+        // send user feature
+        String userId = ((Text)msg.get(OnlineCF.Settings.MSG_INP_USER_FEATURES)).toString().substring(1);
+        resp.put(OnlineCF.Settings.MSG_INP_USER_FEATURES, new Text(userId));
+        resp.put(OnlineCF.Settings.MSG_VALUE, inpUsersFeatures.get(userId));
+      }
+      peer.send(peer.getPeerName(senderId), resp);
+    }
+  }
+
+  private void collectFeatures(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer) 
+          throws IOException {
+    // remove all features,
+    // since we will get necessary features via messages
+    inpItemsFeatures = new HashMap<String, VectorWritable>();
+    inpUsersFeatures = new HashMap<String, VectorWritable>();
+
+    MapWritable msg = null;
+    int userFeatureSize = 0;
+    int itemFeatureSize = 0;
+    while ((msg = peer.getCurrentMessage()) != null) {
+      if (msg.containsKey(OnlineCF.Settings.MSG_INP_ITEM_FEATURES)) {
+        // send item feature
+        String itemId = ((Text)msg.get(OnlineCF.Settings.MSG_INP_ITEM_FEATURES)).toString();
+        inpItemsFeatures.put(itemId, (VectorWritable)msg.get(OnlineCF.Settings.MSG_VALUE));
+        itemFeatureSize = ((VectorWritable)msg.get(OnlineCF.Settings.MSG_VALUE)).getVector().getLength();
+      } else if (msg.containsKey(OnlineCF.Settings.MSG_INP_USER_FEATURES)) {
+        // send user feature
+        String userId = ((Text)msg.get(OnlineCF.Settings.MSG_INP_USER_FEATURES)).toString();
+        inpUsersFeatures.put(userId, (VectorWritable)msg.get(OnlineCF.Settings.MSG_VALUE));
+        userFeatureSize = ((VectorWritable)msg.get(OnlineCF.Settings.MSG_VALUE)).getVector().getLength();
+      }
+    }
+    if (inpItemsFeatures.size() > 0) {
+      itemFeatureMatrix = new DenseDoubleMatrix(MATRIX_RANK, itemFeatureSize, rnd);
+    }
+    if (inpUsersFeatures.size() > 0) {
+      userFeatureMatrix = new DenseDoubleMatrix(MATRIX_RANK, userFeatureSize, rnd);
+    }
+  }
+
+  private void askForFeatures(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer,
+      HashSet<Text> requiredUserFeatures, 
+      HashSet<Text> requiredItemFeatures) 
+          throws IOException, SyncException, InterruptedException {
+    int peerCount = peer.getNumPeers();
+    int peerId = peer.getPeerIndex();
+    
+    if (requiredUserFeatures != null) {
+      Iterator<Text> iter = requiredUserFeatures.iterator();
+      Text key = null;
+      while (iter.hasNext()) {
+        MapWritable msg = new MapWritable();
+        key = iter.next();
+        msg.put(OnlineCF.Settings.MSG_INP_USER_FEATURES, key);
+        msg.put(OnlineCF.Settings.MSG_SENDER_ID, new IntWritable(peerId));
+        peer.send(peer.getPeerName(key.hashCode()%peerCount), msg);
+      }
+    }
+
+    if (requiredItemFeatures != null) {
+      Iterator<Text> iter = requiredItemFeatures.iterator();
+      Text key = null;
+      while (iter.hasNext()) {
+        MapWritable msg = new MapWritable();
+        key = iter.next();
+        msg.put(OnlineCF.Settings.MSG_INP_ITEM_FEATURES, key);
+        msg.put(OnlineCF.Settings.MSG_SENDER_ID, new IntWritable(peerId));
+        peer.send(peer.getPeerName(key.hashCode()%peerCount), msg);
+      }
+    }
+  }
+
+  private void collectInput(
+      BSPPeer<Text, VectorWritable, Text, VectorWritable, MapWritable> peer,
+      HashSet<Text> requiredUserFeatures,
+      HashSet<Text> requiredItemFeatures) 
+          throws IOException {
+    Text key = new Text();
+    VectorWritable value = new VectorWritable();
+    int counter = 0;
+
+    requiredUserFeatures = new HashSet<Text>();
+    requiredItemFeatures = new HashSet<Text>();
+
+    while(peer.readNext(key, value)) {
+      // key format: (0, 1..n)
+      //  0 - delimiter, for type of key
+      //  1..n - actaul key value
+      String firstSymbol = key.toString().substring(0, 1);
+      String actualId = key.toString().substring(1);
+
+      if (firstSymbol.equals(inputPreferenceDelim)) {
+        // parse as <k:userId, v:(itemId, score)>
+        String itemId = Long.toString((long)value.getVector().get(0));
+        String score = Double.toString(value.getVector().get(1));
+
+        if (usersMatrix.containsKey(actualId) == false) {
+          DenseDoubleVector vals = new DenseDoubleVector(MATRIX_RANK);
+          for (int i=0; i<MATRIX_RANK; i++) {
+            vals.set(i, rnd.nextDouble());
+          }
+          VectorWritable rndValues = new VectorWritable(vals);
+          usersMatrix.put(actualId, rndValues);
+        }
+
+        if (itemsMatrix.containsKey(itemId) == false) {
+          DenseDoubleVector vals = new DenseDoubleVector(MATRIX_RANK);
+          for (int i=0; i<MATRIX_RANK; i++) {
+            vals.set(i, rnd.nextDouble());
+          }
+          VectorWritable rndValues = new VectorWritable(vals);
+          itemsMatrix.put(itemId, rndValues);
+        }
+        preferences.add(new Preference<String, String>(actualId, itemId, Double.parseDouble(score)));
+        indexes.add(counter);
+        
+        // since we used HashPartitioner, 
+        // in order to ask for input feature we need peer index
+        // we can obtain peer index by using actual key
+        requiredUserFeatures.add(new Text(inputUserDelim+actualId));
+        requiredItemFeatures.add(new Text(inputItemDelim+itemId));
+        counter++;
+      } else if (firstSymbol.equals(inputUserDelim)) {
+        // parse as <k:userId, v:(ArrayOfFeatureValues)>
+        if (inpUsersFeatures == null) {
+          inpUsersFeatures = new HashMap<String, VectorWritable>();          
+        }
+        inpUsersFeatures.put(actualId, value);
+        
+        
+      } else if (firstSymbol.equals(inputItemDelim)) {
+        // parse as <k:itemId, v:(ArrayOfFeatureValues)>
+        if (inpItemsFeatures == null) {
+          inpItemsFeatures = new HashMap<String, VectorWritable>();
+        }
+        inpItemsFeatures.put(actualId, value);
+        
+      } else {
+        // just skip, maybe we should throw exception
+        continue;
+      }
+    }
+  }
+
+
+}

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/OnlineTrainBSP.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/MeanAbsError.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/MeanAbsError.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/MeanAbsError.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/MeanAbsError.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.recommendation.cf.function;
+
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleMatrix;
+import org.apache.hama.commons.math.DenseDoubleVector;
+import org.apache.hama.commons.math.DoubleMatrix;
+import org.apache.hama.commons.math.DoubleVector;
+import org.apache.hama.ml.recommendation.cf.function.OnlineUpdate.InputStructure;
+import org.apache.hama.ml.recommendation.cf.function.OnlineUpdate.OutputStructure;
+
+/**
+ * <h3>Mean absolute error function for Online CF
+ * as described in paper see #HAMA-612</h3>
+ * Input:
+ * <lu> 
+ *   <li>User features matrix       - x (n x C)</li>
+ *   <li>Product features vector    - y (n x D)</li>
+ * </lu>
+ *
+ * Initialize randomly:
+ * <lu>
+ *   <li>User factorized values α (n x k);</li>
+ *   <li>Item factorized values β (n x k);</li>
+ *   <li>User feature factorized value μ (k x C);</li> 
+ *   <li>Item feature factorized value ν ∈ (k x D);</li>
+ * </lu>
+ * 
+ * Algorithm:
+ * <pre>for each (a, b, r) do:
+ *   Compute R ← (α_al + μ_l: * x_a:)(β_bl + ν_l: * y_b:)
+ *   for l=1 to MATRIX_RANK do:
+ *     α_al ← α_al + 2τ * (β_bl + ν_l: * y_b:)(r − R)
+ *     β_bl ← β_bl + 2τ * (α_al + μ_l: * x_a:)(r − R)
+ *     for c = 1 to C do:
+ *       μ_lc ← μ_lc + 2τ * x_ac (β_bl + ν_l: * y_b:)(r − R)
+ *     for d = 1 to D do:
+ *       ν_ld ← ν_ld + 2τ * y_bd (α_al + μ_l: * x_a:)(r − R)
+ * 
+ * Output: α, β, μ, ν
+ * </pre>
+ * 
+ * Below class implements computation phase under <b>for each (a,b,r) do</b> block 
+ */
+public class MeanAbsError implements OnlineUpdate.Function{
+  private static final double TETTA = 0.01; 
+  private DoubleVector zeroVector = null;
+  @Override
+  public OutputStructure compute(InputStructure e) {
+    OutputStructure res = new OutputStructure();
+
+    int rank = e.user.getVector().getLength();
+    if (zeroVector == null) {
+      zeroVector = new DenseDoubleVector(rank, 0);
+    }
+    // below vectors are all size of MATRIX_RANK
+    DoubleVector vl_yb_item = zeroVector;
+    DoubleVector ml_xa_user = zeroVector;
+    DoubleVector bbl_vl_yb = null;
+    DoubleVector aal_ml_xa = null;
+
+    boolean isAvailableUserFeature = (e.userFeatures!=null);
+    boolean isAvailableItemFeature = (e.itemFeatures!=null);
+
+    if (isAvailableItemFeature) {
+      DoubleVector yb = e.itemFeatures.getVector();
+      vl_yb_item = e.itemFeatureFactorized.multiplyVector(yb);
+    }
+    
+    if (isAvailableUserFeature) {
+      DoubleVector xa = e.userFeatures.getVector();
+      ml_xa_user = e.userFeatureFactorized.multiplyVector(xa);
+    }
+    
+    bbl_vl_yb = e.item.getVector().add(vl_yb_item);
+    aal_ml_xa = e.user.getVector().add(ml_xa_user);
+    
+    //calculated score
+    double calculatedScore = aal_ml_xa.multiply(bbl_vl_yb).sum();
+    double expectedScore = e.expectedScore.get();
+    double scoreDifference = 0.0;
+    scoreDifference = expectedScore - calculatedScore;
+    
+    // β_bl ← β_bl + 2τ * (α_al + μ_l: * x_a:)(r − R)
+    // items ← item + itemFactorization (will be used later)
+    DoubleVector itemFactorization = aal_ml_xa.multiply(2*TETTA*scoreDifference);
+    DoubleVector items = e.item.getVector().add( itemFactorization );
+    res.itemFactorized = new VectorWritable(items);
+    
+    // α_al ← α_al + 2τ * (β_bl + ν_l: * y_b:)(r − R)
+    // users ← user + userFactorization (will be used later)
+    DoubleVector userFactorization = bbl_vl_yb.multiply(2*TETTA*scoreDifference);
+    DoubleVector users = e.user.getVector().add( userFactorization );
+    res.userFactorized = new VectorWritable(users);
+
+    // for d = 1 to D do:
+    //   ν_ld ← ν_ld + 2τ * y_bd (α_al + μ_l: * x_a:)(r − R)
+    // for c = 1 to C do:
+    //   μ_lc ← μ_lc + 2τ * x_ac (β_bl + ν_l: * y_b:)(r − R)
+    // 
+    // ν_ld, μ_lc (V) is type of matrix, 
+    // but 2τ * y_bd (α_al + μ_l: * x_a:)(r − R) (M later) will be type of vector
+    // in order to add vector values to matrix
+    // we create matrix with MMMMM and then transpose it.
+    DoubleMatrix tmp = null;
+    if (isAvailableItemFeature) {
+      DoubleVector[] Mtransposed = new DenseDoubleVector[rank];
+      for (int i = 0; i<rank; i++) {
+        Mtransposed[i] = e.itemFeatureFactorized.getRowVector(i).multiply(aal_ml_xa.get(i));
+      }
+      tmp = new DenseDoubleMatrix(Mtransposed);
+      tmp = tmp.multiply(2*TETTA*scoreDifference);
+      res.itemFeatureFactorized = e.itemFeatureFactorized.add(tmp);
+    }
+
+    if (isAvailableUserFeature) {
+      DoubleVector[] Mtransposed = new DenseDoubleVector[rank];
+      for (int i = 0; i<rank; i++) {
+        Mtransposed[i] = e.userFeatureFactorized.getRowVector(i).multiply(bbl_vl_yb.get(i));
+      }
+      tmp = new DenseDoubleMatrix(Mtransposed);
+      tmp = tmp.multiply(2*TETTA*scoreDifference);
+      res.userFeatureFactorized = e.userFeatureFactorized.add(tmp);
+    }
+    return res;
+  }
+  @Override
+  public double predict(InputStructure e) {
+    int rank = e.user.getVector().getLength();
+    if (zeroVector == null) {
+      zeroVector = new DenseDoubleVector(rank, 0);
+    }
+    // below vectors are all size of MATRIX_RANK
+    DoubleVector vl_yb_item = zeroVector;
+    DoubleVector ml_xa_user = zeroVector;
+    DoubleVector bbl_vl_yb = null;
+    DoubleVector aal_ml_xa = null;
+
+    boolean isAvailableUserFeature = (e.userFeatures!=null);
+    boolean isAvailableItemFeature = (e.itemFeatures!=null);
+
+    if (isAvailableItemFeature) {
+      DoubleVector yb = e.itemFeatures.getVector();
+      vl_yb_item = e.itemFeatureFactorized.multiplyVector(yb);
+    }
+
+    if (isAvailableUserFeature) {
+      DoubleVector xa = e.userFeatures.getVector();
+      ml_xa_user = e.userFeatureFactorized.multiplyVector(xa);
+    }
+    
+    bbl_vl_yb = e.item.getVector().add(vl_yb_item);
+    aal_ml_xa = e.user.getVector().add(ml_xa_user);
+    
+    //calculated score
+    double calculatedScore = aal_ml_xa.multiply(bbl_vl_yb).sum();
+    return calculatedScore;
+  }
+}

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/MeanAbsError.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/OnlineUpdate.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/OnlineUpdate.java?rev=1558747&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/OnlineUpdate.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/OnlineUpdate.java Thu Jan 16 10:41:17 2014
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.recommendation.cf.function;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DoubleMatrix;
+
+public class OnlineUpdate {
+  public static class InputStructure {
+    public VectorWritable user;
+    public VectorWritable item;
+    public VectorWritable userFeatures;
+    public VectorWritable itemFeatures;
+    public DoubleMatrix userFeatureFactorized;
+    public DoubleMatrix  itemFeatureFactorized;
+    public DoubleWritable expectedScore;
+  }
+
+  public static class OutputStructure {
+    public VectorWritable userFactorized;
+    public VectorWritable itemFactorized;
+    public DoubleMatrix  userFeatureFactorized;
+    public DoubleMatrix  itemFeatureFactorized;
+  }
+
+  public static interface Function {
+    OnlineUpdate.OutputStructure compute(OnlineUpdate.InputStructure e);
+    double predict(OnlineUpdate.InputStructure e);
+  }
+}

Propchange: hama/trunk/ml/src/main/java/org/apache/hama/ml/recommendation/cf/function/OnlineUpdate.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message