hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1679303 - in /hama/trunk: core/src/main/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/bsp/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/ graph/src/test/java/org/apache/hama/graph...
Date Thu, 14 May 2015 01:48:39 GMT
Author: edwardyoon
Date: Thu May 14 01:48:38 2015
New Revision: 1679303

URL: http://svn.apache.org/r1679303
Log:
HAMA-956: Support force-setting the no. of tasks

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
    hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu May 14 01:48:38
2015
@@ -45,7 +45,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
@@ -58,11 +57,6 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.message.MessageManager;
-import org.apache.hama.bsp.message.OutgoingMessageManager;
-import org.apache.hama.bsp.message.OutgoingPOJOMessageBundle;
-import org.apache.hama.bsp.message.queue.MessageQueue;
-import org.apache.hama.bsp.message.queue.SortedMemoryQueue;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.ipc.JobSubmissionProtocol;
 import org.apache.hama.ipc.RPC;
@@ -349,24 +343,32 @@ public class BSPJobClient extends Config
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
 
-      InputSplit[] splits = job.getInputFormat().getSplits(job, (maxTasks > configured)
? configured : maxTasks);
-      
+      InputSplit[] splits = job.getInputFormat().getSplits(job,
+          (maxTasks > configured) ? configured : maxTasks);
+
       if (maxTasks < splits.length) {
         throw new IOException(
             "Job failed! The number of splits has exceeded the number of max tasks. The number
of splits: "
                 + splits.length + ", The number of max tasks: " + maxTasks);
       }
-      
-      /*
-      job = partition(job, splits, maxTasks);
-      maxTasks = job.getInt("hama.partition.count", maxTasks);
 
-      if (job.getBoolean("input.has.partitioned", false)) {
-        splits = job.getInputFormat().getSplits(job, maxTasks);
+      /*
+      FIXME now graph job doesn't use this runtime input partitioning
+      Should we support this feature at BSP framework level?
+      
+      if(job.getConfiguration().getBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, false))
{
+        job = partition(job, splits, maxTasks);
+        maxTasks = job.getInt("hama.partition.count", maxTasks);
       }
       */
+      
+      int numOfSplits = writeSplits(job, splits, submitSplitFile, maxTasks);
+      if (numOfSplits > configured
+          || !job.getConfiguration().getBoolean("hama.force.set.bsp.tasks",
+              false)) {
+        job.setNumBspTask(numOfSplits);
+      }
 
-      job.setNumBspTask(writeSplits(job, splits, submitSplitFile, maxTasks));
       job.set("bsp.job.split.file", submitSplitFile.toString());
     }
 
@@ -407,105 +409,6 @@ public class BSPJobClient extends Config
     return launchJob(jobId, job, submitJobFile, fs);
   }
 
-  protected BSPJob partition(BSPJob job, InputSplit[] splits, int maxTasks)
-      throws IOException {
-    String inputPath = job.getConfiguration().get(Constants.JOB_INPUT_DIR);
-
-    Path partitionDir = new Path("/tmp/hama-parts/" + job.getJobID() + "/");
-    if (fs.exists(partitionDir)) {
-      fs.delete(partitionDir, true);
-    }
-
-    if (job.get("bsp.partitioning.runner.job") != null) {
-      return job;
-    }// Early exit for the partitioner job.
-
-    if (inputPath != null) {
-      int numSplits = splits.length;
-      int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(" numTasks = "
-            + numTasks
-            + " numSplits = "
-            + numSplits
-            + " enable = "
-            + (job.getConfiguration().getBoolean(
-                Constants.ENABLE_RUNTIME_PARTITIONING, false)
-                + " class = " + job.getConfiguration().get(
-                Constants.RUNTIME_PARTITIONING_CLASS)));
-      }
-
-      if (numTasks == 0) {
-        numTasks = numSplits;
-      }
-
-      if (job.getConfiguration().getBoolean(
-          Constants.ENABLE_RUNTIME_PARTITIONING, false)
-          && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) !=
null) {
-
-        HamaConfiguration conf = new HamaConfiguration(job.getConfiguration());
-
-        conf.setInt(Constants.RUNTIME_DESIRED_PEERS_COUNT, numTasks);
-        if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
-          conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration()
-              .get(Constants.RUNTIME_PARTITIONING_DIR));
-        }
-
-        conf.set(Constants.RUNTIME_PARTITIONING_CLASS,
-            job.get(Constants.RUNTIME_PARTITIONING_CLASS));
-        BSPJob partitioningJob = new BSPJob(conf);
-        partitioningJob.setJobName("Runtime partitioning job for "
-            + partitioningJob.getJobName());
-        LOG.debug("partitioningJob input: "
-            + partitioningJob.get(Constants.JOB_INPUT_DIR));
-        
-        partitioningJob.getConfiguration().setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
-            OutgoingPOJOMessageBundle.class, OutgoingMessageManager.class);
-        partitioningJob.getConfiguration().setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
-            SortedMemoryQueue.class, MessageQueue.class);
-        
-        partitioningJob.setInputFormat(job.getInputFormat().getClass());
-        partitioningJob.setInputKeyClass(job.getInputKeyClass());
-        partitioningJob.setInputValueClass(job.getInputValueClass());
-        
-        partitioningJob.setOutputFormat(SequenceFileOutputFormat.class);
-        partitioningJob.setOutputKeyClass(job.getInputKeyClass());
-        partitioningJob.setOutputValueClass(job.getInputValueClass());
-        
-        partitioningJob.setBspClass(PartitioningRunner.class);
-        partitioningJob.setMessageClass(MapWritable.class);
-        partitioningJob.set("bsp.partitioning.runner.job", "true");
-        partitioningJob.getConfiguration().setBoolean(
-            Constants.ENABLE_RUNTIME_PARTITIONING, false);
-        partitioningJob.setOutputPath(partitionDir);
-
-        boolean isPartitioned = false;
-        try {
-          isPartitioned = partitioningJob.waitForCompletion(true);
-        } catch (InterruptedException e) {
-          LOG.error("Interrupted partitioning run-time.", e);
-        } catch (ClassNotFoundException e) {
-          LOG.error("Class not found error partitioning run-time.", e);
-        }
-
-        if (isPartitioned) {
-          if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
-            job.setInputPath(new Path(conf
-                .get(Constants.RUNTIME_PARTITIONING_DIR)));
-          } else {
-            job.setInputPath(partitionDir);
-          }
-          job.setBoolean("input.has.partitioned", true);
-          job.setInputFormat(NonSplitSequenceFileInputFormat.class);
-        } else {
-          LOG.error("Error partitioning the input path.");
-          throw new IOException("Runtime partition failed for the job.");
-        }
-      }
-    }
-    return job;
-  }
-
   protected RunningJob launchJob(BSPJobID jobId, BSPJob job,
       Path submitJobFile, FileSystem fs) throws IOException {
     //
@@ -569,17 +472,6 @@ public class BSPJobClient extends Config
       DataOutputBuffer buffer = new DataOutputBuffer();
       RawSplit rawSplit = new RawSplit();
       for (InputSplit split : splits) {
-
-        /*
-        // set partitionID to rawSplit
-        if (split.getClass().getName().equals(FileSplit.class.getName())) {
-          LOG.debug(((FileSplit) split).getPath().getName());
-          String[] extractPartitionID = ((FileSplit) split).getPath().getName()
-              .split("[-]");
-          rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
-        }
-        */
-
         rawSplit.setClassName(split.getClass().getName());
         buffer.reset();
         split.write(buffer);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu May 14 01:48:38
2015
@@ -630,11 +630,17 @@ public final class BSPPeerImpl<K1, V1, K
 
   @Override
   public final boolean readNext(K1 key, V1 value) throws IOException {
-    return in.next(key, value);
+    if(in != null)
+      return in.next(key, value);
+    else
+      return false;
   }
 
   @Override
   public final KeyValuePair<K1, V1> readNext() throws IOException {
+    if (split == null)
+      return null;
+
     K1 k = in.createKey();
     V1 v = in.createValue();
     if (in.next(k, v)) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Thu May 14 01:48:38
2015
@@ -262,15 +262,21 @@ public class JobInProgress {
       } finally {
         splitFile.close();
       }
-      numBSPTasks = splits.length;
-      LOG.info("num BSPTasks: " + numBSPTasks);
+      LOG.debug("numBSPTasks: " + numBSPTasks + ", splits.length: "
+          + splits.length);
 
       // adjust number of BSP tasks to actual number of splits
       this.tasks = new TaskInProgress[numBSPTasks];
-      for (int i = 0; i < numBSPTasks; i++) {
+      for (int i = 0; i < splits.length; i++) {
         tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
             splits[i], this.conf, this, i);
       }
+
+      for (int i = splits.length; i < numBSPTasks; i++) {
+        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
+            null, this.conf, this, i);
+      }
+
     } else {
       this.tasks = new TaskInProgress[numBSPTasks];
       for (int i = 0; i < numBSPTasks; i++) {
@@ -329,7 +335,9 @@ public class JobInProgress {
   }
 
   /**
-   * Gets the task for the first <code>TaskInProgress</code> that is not already
running
+   * Gets the task for the first <code>TaskInProgress</code> that is not already
+   * running
+   * 
    * @param groomStatuses Map of statuses for available groom servers
    * @return
    */
@@ -357,6 +365,7 @@ public class JobInProgress {
 
   /**
    * Creates a new task based on the provided <code>TaskInProgress</code>
+   * 
    * @param task The <code>TaskInProgress</code> for which to create a task
    * @param groomStatuses The statuses of available groom servers
    * @param resources Available resources of the given groom server
@@ -367,9 +376,8 @@ public class JobInProgress {
     Task result = null;
     String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
         groomStatuses, taskCountInGroomMap, resources, task);
-    GroomServerStatus groomStatus = taskAllocationStrategy
-        .getGroomToAllocate(groomStatuses, selectedGrooms,
-            taskCountInGroomMap, resources, task);
+    GroomServerStatus groomStatus = taskAllocationStrategy.getGroomToAllocate(
+        groomStatuses, selectedGrooms, taskCountInGroomMap, resources, task);
     if (groomStatus != null) {
       result = task.constructTask(groomStatus);
     } else if (LOG.isDebugEnabled()) {
@@ -381,19 +389,20 @@ public class JobInProgress {
     }
     return result;
   }
-  
+
   /**
    * Get the ideal grooms on which to run a given task for the provided
    * constraints
+   * 
    * @return
    */
-  public String[] getPreferredGrooms(TaskInProgress task, 
+  public String[] getPreferredGrooms(TaskInProgress task,
       Map<String, GroomServerStatus> groomStatuses, BSPResource[] resources) {
-    String[] grooms = taskAllocationStrategy.selectGrooms(
-	            groomStatuses, taskCountInGroomMap, resources, task);
+    String[] grooms = taskAllocationStrategy.selectGrooms(groomStatuses,
+        taskCountInGroomMap, resources, task);
     return grooms;
   }
-  
+
   public void recoverTasks(Map<String, GroomServerStatus> groomStatuses,
       Map<GroomServerStatus, List<GroomServerAction>> actionMap)
       throws IOException {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu May 14 01:48:38
2015
@@ -241,7 +241,7 @@ public class LocalBSPRunner implements J
 
       String splitname = null;
       BytesWritable realBytes = null;
-      if (splits != null) {
+      if (splits != null && splits.length > id) {
         splitname = splits[id].getClassName();
         realBytes = splits[id].getBytes();
       }
@@ -349,9 +349,9 @@ public class LocalBSPRunner implements J
     @Override
     public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
         throws IOException {
-      //peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
-      //    bundle.getLength());
-      
+      // peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
+      // bundle.getLength());
+
       MANAGER_MAP.get(addr).localQueueForNextIteration.addBundle(bundle);
       peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
           bundle.size());

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java Thu
May 14 01:48:38 2015
@@ -163,6 +163,8 @@ public class TestKeyValueTextInputFormat
       job.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
       job.setPartitioner(HashPartitioner.class);
 
+      // FIXME see 362 line at BSPJobClient.java
+      job.setNumBspTask(1);
       job.setInputPath(dataPath);
       job.setInputFormat(KeyValueTextInputFormat.class);
       job.setInputKeyClass(Text.class);
@@ -173,10 +175,6 @@ public class TestKeyValueTextInputFormat
       job.setOutputKeyClass(NullWritable.class);
       job.setOutputValueClass(NullWritable.class);
 
-      BSPJobClient jobClient = new BSPJobClient(conf);
-      ClusterStatus cluster = jobClient.getClusterStatus(true);
-      job.setNumBspTask(cluster.getMaxTasks());
-      
       assertEquals(true, job.waitForCompletion(true));
 
     } catch (Exception e) {

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Thu May 14
01:48:38 2015
@@ -56,7 +56,7 @@ public class PageRankTest extends TestCa
     generateTestData();
     try {
       PageRank.main(new String[] { "-input_path", INPUT, "-output_path",
-          OUTPUT, "-task_num", "3", "-f", "json" });
+          OUTPUT, "-task_num", "5", "-f", "json" });
       verifyResult();
     } catch (ParseException e) {
       e.printStackTrace();

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu May 14 01:48:38
2015
@@ -58,7 +58,8 @@ public class GraphJob extends BSPJob {
     super(conf);
     conf.setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
         OutgoingVertexMessageManager.class, OutgoingMessageManager.class);
-
+    conf.setBoolean("hama.bsp.force.set.tasks", true);
+    
     this.setBspClass(GraphJobRunner.class);
     this.setJarByClass(exampleClass);
     this.setVertexIDClass(Text.class);

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Thu May 14
01:48:38 2015
@@ -84,6 +84,8 @@ public class TestSubmitGraphJob extends
     // set the defaults
     bsp.setMaxIteration(30);
 
+    bsp.setNumBspTask(2);
+    
     bsp.setCompressionCodec(Bzip2Compressor.class);
     bsp.setAggregatorClass(AverageAggregator.class);
 

Modified: hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java (original)
+++ hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java Thu May
14 01:48:38 2015
@@ -31,41 +31,38 @@ import org.apache.hama.ml.recommendation
 import org.apache.hama.ml.recommendation.cf.function.MeanAbsError;
 import org.junit.Test;
 
-public class TestOnlineCF extends TestCase{
+public class TestOnlineCF extends TestCase {
   @SuppressWarnings({ "deprecation", "rawtypes", "unchecked" })
   @Test
   public void testOnlineCF() {
-    Preference[] train_prefs = {
-                        new Preference<Integer, Integer>(1, 1, 4),
-                        new Preference<Integer, Integer>(1, 2, 2.5),
-                        new Preference<Integer, Integer>(1, 3, 3.5),
-                        new Preference<Integer, Integer>(1, 4, 1),
-                        new Preference<Integer, Integer>(1, 5, 3.5),
-                        new Preference<Integer, Integer>(2, 1, 4),
-                        new Preference<Integer, Integer>(2, 2, 2.5),
-                        new Preference<Integer, Integer>(2, 3, 3.5),
-                        new Preference<Integer, Integer>(2, 4, 1),
-                        new Preference<Integer, Integer>(2, 5, 3.5),
-                        new Preference<Integer, Integer>(3, 1, 4),
-                        new Preference<Integer, Integer>(3, 2, 2.5),
-                        new Preference<Integer, Integer>(3, 3, 3.5)};
-    Preference[] test_prefs = {
-                        new Preference<Integer, Integer>(1, 3, 3.5),
-                        new Preference<Integer, Integer>(2, 4, 1),
-                        new Preference<Integer, Integer>(3, 4, 1),
-                        new Preference<Integer, Integer>(3, 5, 3.5)
-                      };
-    
+    Preference[] train_prefs = { new Preference<Integer, Integer>(1, 1, 4),
+        new Preference<Integer, Integer>(1, 2, 2.5),
+        new Preference<Integer, Integer>(1, 3, 3.5),
+        new Preference<Integer, Integer>(1, 4, 1),
+        new Preference<Integer, Integer>(1, 5, 3.5),
+        new Preference<Integer, Integer>(2, 1, 4),
+        new Preference<Integer, Integer>(2, 2, 2.5),
+        new Preference<Integer, Integer>(2, 3, 3.5),
+        new Preference<Integer, Integer>(2, 4, 1),
+        new Preference<Integer, Integer>(2, 5, 3.5),
+        new Preference<Integer, Integer>(3, 1, 4),
+        new Preference<Integer, Integer>(3, 2, 2.5),
+        new Preference<Integer, Integer>(3, 3, 3.5) };
+    Preference[] test_prefs = { new Preference<Integer, Integer>(1, 3, 3.5),
+        new Preference<Integer, Integer>(2, 4, 1),
+        new Preference<Integer, Integer>(3, 4, 1),
+        new Preference<Integer, Integer>(3, 5, 3.5) };
+
     Random rnd = new Random();
     Long num = Long.valueOf(rnd.nextInt(100000));
     String fileName = "onlinecf_train" + num.toString();
     String outputFileName = "onlinecf_model" + num.toString();
-    
+
     Configuration fsConf = new Configuration();
     String strDataPath = "/tmp/" + fileName;
     String convertedFileName = "/tmp/converted_" + fileName;
     Path dataPath = new Path(strDataPath);
-    
+
     try {
       URI uri = new URI(strDataPath);
       FileSystem fs = FileSystem.get(uri, fsConf);
@@ -83,10 +80,11 @@ public class TestOnlineCF extends TestCa
       }
       fileOut.writeBytes(str.toString());
       fileOut.close();
-      
+
       MovieLensConverter converter = new MovieLensConverter();
-      assertEquals(true, converter.convert(strDataPath, null, convertedFileName));
-      
+      assertEquals(true,
+          converter.convert(strDataPath, null, convertedFileName));
+
       OnlineCF recommender = new OnlineCF();
       recommender.setInputPreferences(convertedFileName);
       recommender.setIteration(150);
@@ -101,11 +99,12 @@ public class TestOnlineCF extends TestCa
       int correct = 0;
       for (Preference<Integer, Integer> test : test_prefs) {
         double actual = test.getValue().get();
-        double estimated = recommender.estimatePreference(test.getUserId(), test.getItemId());

-        correct += (Math.abs(actual-estimated)<0.5)?1:0;
+        double estimated = recommender.estimatePreference(test.getUserId(),
+            test.getItemId());
+        correct += (Math.abs(actual - estimated) < 0.5) ? 1 : 0;
       }
 
-      assertEquals(test_prefs.length*0.75, correct, 1);
+      assertEquals(test_prefs.length * 0.75, correct, 1);
 
       fs.delete(new Path(outputFileName));
       fs.delete(new Path(strDataPath));



Mime
View raw message