hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r727016 - in /hadoop/core/branches/branch-0.20: CHANGES.txt src/test/org/apache/hadoop/mapred/MRCaching.java src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
Date Tue, 16 Dec 2008 10:51:36 GMT
Author: cdouglas
Date: Tue Dec 16 02:51:36 2008
New Revision: 727016

URL: http://svn.apache.org/viewvc?rev=727016&view=rev
Log:
HADOOP-4458. Add a test creating symlinks in the working directory. Contributed by Amareshwari
Sriramadasu.

Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MRCaching.java
    hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=727016&r1=727015&r2=727016&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Tue Dec 16 02:51:36 2008
@@ -431,6 +431,9 @@
     HADOOP-4845. Modify the reduce input byte counter to record only the
     compressed size and add a human-readable label. (Yongqiang He via cdouglas)
 
+    HADOOP-4458. Add a test creating symlinks in the working directory.
+    (Amareshwari Sriramadasu via cdouglas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MRCaching.java?rev=727016&r1=727015&r2=727016&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MRCaching.java (original)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MRCaching.java Tue
Dec 16 02:51:36 2008
@@ -110,6 +110,55 @@
   }
 
   /**
+   * Using the wordcount example and adding caching to it. The cache
+   * archives/files are set and then are checked in the map if they have been
+   * symlinked or not.
+   */
+  public static class MapClass2 extends MapClass {
+    
+    JobConf conf;
+
+    public void configure(JobConf jconf) {
+      conf = jconf;
+      try {
+        // read the cached files (unzipped, unjarred and text)
+        // and put it into a single file TEST_ROOT_DIR/test.txt
+        String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp");
+        Path file = new Path("file:///", TEST_ROOT_DIR);
+        FileSystem fs = FileSystem.getLocal(conf);
+        if (!fs.mkdirs(file)) {
+          throw new IOException("Mkdirs failed to create " + file.toString());
+        }
+        Path fileOut = new Path(file, "test.txt");
+        fs.delete(fileOut, true);
+        DataOutputStream out = fs.create(fileOut); 
+        String[] symlinks = new String[6];
+        symlinks[0] = ".";
+        symlinks[1] = "testjar";
+        symlinks[2] = "testzip";
+        symlinks[3] = "testtgz";
+        symlinks[4] = "testtargz";
+        symlinks[5] = "testtar";
+
+        for (int i = 0; i < symlinks.length; i++) {
+          // read out the files from these archives
+          File f = new File(symlinks[i]);
+          File txt = new File(f, "test.txt");
+          FileInputStream fin = new FileInputStream(txt);
+          BufferedReader reader = new BufferedReader(new InputStreamReader(fin));
+          String str = reader.readLine();
+          reader.close();
+          out.writeBytes(str);
+          out.writeBytes("\n");
+        }
+        out.close();
+      } catch (IOException ie) {
+        System.out.println(StringUtils.stringifyException(ie));
+      }
+    }
+  }
+
+  /**
    * A reducer class that just emits the sum of the input values.
    */
   public static class ReduceClass extends MapReduceBase
@@ -135,9 +184,40 @@
     }
   }
 
+  static void setupCache(String cacheDir, FileSystem fs) 
+  throws IOException {
+    Path localPath = new Path("build/test/cache");
+    Path txtPath = new Path(localPath, new Path("test.txt"));
+    Path jarPath = new Path(localPath, new Path("test.jar"));
+    Path zipPath = new Path(localPath, new Path("test.zip"));
+    Path tarPath = new Path(localPath, new Path("test.tgz"));
+    Path tarPath1 = new Path(localPath, new Path("test.tar.gz"));
+    Path tarPath2 = new Path(localPath, new Path("test.tar"));
+    Path cachePath = new Path(cacheDir);
+    fs.delete(cachePath, true);
+    if (!fs.mkdirs(cachePath)) {
+      throw new IOException("Mkdirs failed to create " + cachePath.toString());
+    }
+    fs.copyFromLocalFile(txtPath, cachePath);
+    fs.copyFromLocalFile(jarPath, cachePath);
+    fs.copyFromLocalFile(zipPath, cachePath);
+    fs.copyFromLocalFile(tarPath, cachePath);
+    fs.copyFromLocalFile(tarPath1, cachePath);
+    fs.copyFromLocalFile(tarPath2, cachePath);
+  }
+ 
+  public static TestResult launchMRCache(String indir,
+                                         String outdir, String cacheDir, 
+                                         JobConf conf, String input) 
+  throws IOException {
+    setupCache(cacheDir, FileSystem.get(conf));
+    return launchMRCache(indir,outdir, cacheDir, conf, input, false); 
+  }
+  
   public static TestResult launchMRCache(String indir,
                                          String outdir, String cacheDir, 
-                                         JobConf conf, String input)
+                                         JobConf conf, String input,
+                                         boolean withSymlink)
     throws IOException {
     String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp"))
       .toString().replace(' ', '+');
@@ -163,7 +243,6 @@
     // the values are counts (ints)
     conf.setOutputValueClass(IntWritable.class);
 
-    conf.setMapperClass(MRCaching.MapClass.class);
     conf.setCombinerClass(MRCaching.ReduceClass.class);
     conf.setReducerClass(MRCaching.ReduceClass.class);
     FileInputFormat.setInputPaths(conf, inDir);
@@ -171,38 +250,29 @@
     conf.setNumMapTasks(1);
     conf.setNumReduceTasks(1);
     conf.setSpeculativeExecution(false);
-    Path localPath = new Path("build/test/cache");
-    Path txtPath = new Path(localPath, new Path("test.txt"));
-    Path jarPath = new Path(localPath, new Path("test.jar"));
-    Path zipPath = new Path(localPath, new Path("test.zip"));
-    Path tarPath = new Path(localPath, new Path("test.tgz"));
-    Path tarPath1 = new Path(localPath, new Path("test.tar.gz"));
-    Path tarPath2 = new Path(localPath, new Path("test.tar"));
-    Path cachePath = new Path(cacheDir);
-    fs.delete(cachePath, true);
-    if (!fs.mkdirs(cachePath)) {
-      throw new IOException("Mkdirs failed to create " + cachePath.toString());
+    URI[] uris = new URI[6];
+    if (!withSymlink) {
+      conf.setMapperClass(MRCaching.MapClass.class);
+      uris[0] = fs.getUri().resolve(cacheDir + "/test.txt");
+      uris[1] = fs.getUri().resolve(cacheDir + "/test.jar");
+      uris[2] = fs.getUri().resolve(cacheDir + "/test.zip");
+      uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
+      uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
+      uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
+    } else {
+      DistributedCache.createSymlink(conf);
+      conf.setMapperClass(MRCaching.MapClass2.class);
+      uris[0] = fs.getUri().resolve(cacheDir + "/test.txt#" + "test.txt");
+      uris[1] = fs.getUri().resolve(cacheDir + "/test.jar#" + "testjar");
+      uris[2] = fs.getUri().resolve(cacheDir + "/test.zip#" + "testzip");
+      uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz#" + "testtgz");
+      uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz#" + "testtargz");
+      uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar");
+    }
+    DistributedCache.addCacheFile(uris[0], conf);
+    for (int i = 1; i < 6; i++) {
+      DistributedCache.addCacheArchive(uris[i], conf);
     }
-    fs.copyFromLocalFile(txtPath, cachePath);
-    fs.copyFromLocalFile(jarPath, cachePath);
-    fs.copyFromLocalFile(zipPath, cachePath);
-    fs.copyFromLocalFile(tarPath, cachePath);
-    fs.copyFromLocalFile(tarPath1, cachePath);
-    fs.copyFromLocalFile(tarPath2, cachePath);
-    // setting the cached archives to zip, jar and simple text files
-    URI uri1 = fs.getUri().resolve(cachePath + "/test.jar");
-    URI uri2 = fs.getUri().resolve(cachePath + "/test.zip");
-    URI uri3 = fs.getUri().resolve(cachePath + "/test.txt");
-    URI uri4 = fs.getUri().resolve(cachePath + "/test.tgz");
-    URI uri5 = fs.getUri().resolve(cachePath + "/test.tar.gz");
-    URI uri6 = fs.getUri().resolve(cachePath + "/test.tar");
-
-    DistributedCache.addCacheArchive(uri1, conf);
-    DistributedCache.addCacheArchive(uri2, conf);
-    DistributedCache.addCacheFile(uri3, conf);
-    DistributedCache.addCacheArchive(uri4, conf);
-    DistributedCache.addCacheArchive(uri5, conf);
-    DistributedCache.addCacheArchive(uri6, conf);
     RunningJob job = JobClient.runJob(conf);
     int count = 0;
     // after the job ran check to see if the input from the localized cache

Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?rev=727016&r1=727015&r2=727016&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
(original)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
Tue Dec 16 02:51:36 2008
@@ -41,13 +41,22 @@
       dfs = new MiniDFSCluster(conf, 1, true, null);
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(2, fileSys.getName(), 4);
+      MRCaching.setupCache("/cachedir", fileSys);
       // run the wordcount example with caching
       TestResult ret = MRCaching.launchMRCache("/testing/wc/input",
                                             "/testing/wc/output",
                                             "/cachedir",
                                             mr.createJobConf(),
                                             "The quick brown fox\nhas many silly\n"
-                                            + "red fox sox\n");
+                                            + "red fox sox\n", false);
+      assertTrue("Archives not matching", ret.isOutputOk);
+      // launch MR cache with symlinks
+      ret = MRCaching.launchMRCache("/testing/wc/input",
+                                    "/testing/wc/output",
+                                    "/cachedir",
+                                    mr.createJobConf(),
+                                    "The quick brown fox\nhas many silly\n"
+                                    + "red fox sox\n", true);
       assertTrue("Archives not matching", ret.isOutputOk);
     } finally {
       if (fileSys != null) {



Mime
View raw message