mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject svn commit: r1295352 - /mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
Date Thu, 01 Mar 2012 00:24:07 GMT
Author: tcp
Date: Thu Mar  1 00:24:07 2012
New Revision: 1295352

URL: http://svn.apache.org/viewvc?rev=1295352&view=rev
Log:
MAHOUT-980: Fix DistributedCache usage to allow EMR deployment

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=1295352&r1=1295351&r2=1295352&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java Thu Mar
 1 00:24:07 2012
@@ -96,15 +96,28 @@ public final class PFPGrowth {
    */
   public static List<Pair<String,Long>> readFList(Configuration conf) throws
IOException {
     List<Pair<String,Long>> list = new ArrayList<Pair<String,Long>>();
-    URI[] files = DistributedCache.getCacheFiles(conf);
+    Path[] files = DistributedCache.getLocalCacheFiles(conf);
     if (files == null) {
       throw new IOException("Cannot read Frequency list from Distributed Cache");
     }
     if (files.length != 1) {
       throw new IOException("Cannot read Frequency list from Distributed Cache ("+files.length+")");
     }
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path fListLocalPath = fs.makeQualified(files[0]);
+    // Fallback if we are running locally.
+    if (! fs.exists(fListLocalPath)) {
+      URI[] filesURIs = DistributedCache.getCacheFiles(conf);
+      if (filesURIs == null) {
+        throw new IOException("Cannot read Frequency list from Distributed Cache");
+      }
+      if (filesURIs.length != 1) {
+        throw new IOException("Cannot read Frequency list from Distributed Cache ("+files.length+")");
+      }
+      fListLocalPath = new Path(filesURIs[0].getPath());
+    }
     for (Pair<Text,LongWritable> record :
-         new SequenceFileIterable<Text,LongWritable>(new Path(files[0].getPath()),
true, conf)) {
+         new SequenceFileIterable<Text,LongWritable>(fListLocalPath, true, conf)) {
       list.add(new Pair<String,Long>(record.getFirst().toString(), record.getSecond().get()));
     }
     return list;



Mime
View raw message