hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1399950 [7/11] - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduc...
Date Fri, 19 Oct 2012 02:28:42 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Fri Oct 19 02:25:55 2012
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -136,6 +138,43 @@ public class TestFileInputFormat {
     }
   }
 
+  /**
+   * Test when the input file's length is 0.
+   */
+  @Test
+  public void testForEmptyFile() throws Exception {
+      Configuration conf = new Configuration();
+      FileSystem fileSys = FileSystem.get(conf);
+      Path file = new Path("test" + "/file");
+      FSDataOutputStream out = fileSys.create(file, true,
+              conf.getInt("io.file.buffer.size", 4096), (short) 1, (long) 1024);
+      out.write(new byte[0]);
+      out.close();
+
+      // split it using a File input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      Job job = Job.getInstance(conf);
+      FileInputFormat.setInputPaths(job, "test");
+      List<InputSplit> splits = inFormat.getSplits(job);
+      assertEquals(1, splits.size());
+      FileSplit fileSplit = (FileSplit) splits.get(0);
+      assertEquals(0, fileSplit.getLocations().length);
+      assertEquals(file.getName(), fileSplit.getPath().getName());
+      assertEquals(0, fileSplit.getStart());
+      assertEquals(0, fileSplit.getLength());
+
+      fileSys.delete(file.getParent(), true);
+  }
+
+  /** Dummy class to extend FileInputFormat*/
+  private class DummyInputFormat extends FileInputFormat<Text, Text> {
+    @Override
+    public RecordReader<Text,Text> createRecordReader(InputSplit split,
+        TaskAttemptContext context) throws IOException {
+      return null;
+    }
+  }
+
   private class FileInputFormatForTest<K, V> extends FileInputFormat<K, V> {
 
     long splitSize;

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestTotalOrderPartitioner.java Fri Oct 19 02:25:55 2012
@@ -21,19 +21,25 @@ package org.apache.hadoop.mapreduce.lib.
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 
 public class TestTotalOrderPartitioner extends TestCase {
@@ -51,6 +57,19 @@ public class TestTotalOrderPartitioner e
     new Text("yak"),   // 9
   };
 
+  private static final String[] splitJavaStrings = new String[] {
+    // -inf            // 0
+    new String("aabbb"), // 1
+    new String("babbb"), // 2
+    new String("daddd"), // 3
+    new String("dddee"), // 4
+    new String("ddhee"), // 5
+    new String("dingo"), // 6
+    new String("hijjj"), // 7
+    new String("n"),     // 8
+    new String("yak"),   // 9
+  };
+
   static class Check<T> {
     T data;
     int part;
@@ -76,19 +95,41 @@ public class TestTotalOrderPartitioner e
     testStrings.add(new Check<Text>(new Text("hi"), 6));
   };
 
-  private static <T extends WritableComparable<?>> Path writePartitionFile(
+  private static final ArrayList<Check<String>> testJavaStrings =
+      new ArrayList<Check<String>>();
+    static {
+      testJavaStrings.add(new Check<String>(new String("aaaaa"), 0));
+      testJavaStrings.add(new Check<String>(new String("aaabb"), 0));
+      testJavaStrings.add(new Check<String>(new String("aabbb"), 1));
+      testJavaStrings.add(new Check<String>(new String("aaaaa"), 0));
+      testJavaStrings.add(new Check<String>(new String("babbb"), 2));
+      testJavaStrings.add(new Check<String>(new String("baabb"), 1));
+      testJavaStrings.add(new Check<String>(new String("yai"), 8));
+      testJavaStrings.add(new Check<String>(new String("yak"), 9));
+      testJavaStrings.add(new Check<String>(new String("z"), 9));
+      testJavaStrings.add(new Check<String>(new String("ddngo"), 5));
+      testJavaStrings.add(new Check<String>(new String("hi"), 6));
+    };
+
+
+  private static <T> Path writePartitionFile(
       String testname, Configuration conf, T[] splits) throws IOException {
     final FileSystem fs = FileSystem.getLocal(conf);
     final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
-                                 ).makeQualified(fs);
+                                 ).makeQualified(
+                                     fs.getUri(),
+                                     fs.getWorkingDirectory());
     Path p = new Path(testdir, testname + "/_partition.lst");
     TotalOrderPartitioner.setPartitionFile(conf, p);
     conf.setInt(MRJobConfig.NUM_REDUCES, splits.length + 1);
     SequenceFile.Writer w = null;
     try {
-      w = SequenceFile.createWriter(fs, conf, p,
-          splits[0].getClass(), NullWritable.class,
-          SequenceFile.CompressionType.NONE);
+      w = SequenceFile.createWriter(
+          conf,
+          SequenceFile.Writer.file(p),
+          SequenceFile.Writer.keyClass(splits[0].getClass()),
+          SequenceFile.Writer.valueClass(NullWritable.class),
+          SequenceFile.Writer.compression(CompressionType.NONE));
       for (int i = 0; i < splits.length; ++i) {
         w.append(splits[i], NullWritable.get());
       }
@@ -99,6 +140,31 @@ public class TestTotalOrderPartitioner e
     return p;
   }
 
+  public void testTotalOrderWithCustomSerialization() throws Exception {
+    TotalOrderPartitioner<String, NullWritable> partitioner =
+        new TotalOrderPartitioner<String, NullWritable>();
+    Configuration conf = new Configuration();
+    conf.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
+        JavaSerialization.class.getName(),
+        WritableSerialization.class.getName());
+    conf.setClass(MRJobConfig.KEY_COMPARATOR,
+        JavaSerializationComparator.class,
+        Comparator.class);
+    Path p = TestTotalOrderPartitioner.<String>writePartitionFile(
+        "totalordercustomserialization", conf, splitJavaStrings);
+    conf.setClass(MRJobConfig.MAP_OUTPUT_KEY_CLASS, String.class, Object.class);
+    try {
+      partitioner.setConf(conf);
+      NullWritable nw = NullWritable.get();
+      for (Check<String> chk : testJavaStrings) {
+        assertEquals(chk.data.toString(), chk.part,
+            partitioner.getPartition(chk.data, nw, splitJavaStrings.length + 1));
+      }
+    } finally {
+      p.getFileSystem(conf).delete(p, true);
+    }
+  }
+
   public void testTotalOrderMemCmp() throws Exception {
     TotalOrderPartitioner<Text,NullWritable> partitioner =
       new TotalOrderPartitioner<Text,NullWritable>();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java Fri Oct 19 02:25:55 2012
@@ -88,8 +88,10 @@ public class TestUmbilicalProtocolWithJo
       .when(mockTT).getProtocolSignature(anyString(), anyLong(), anyInt());
 
     JobTokenSecretManager sm = new JobTokenSecretManager();
-    final Server server = RPC.getServer(TaskUmbilicalProtocol.class, mockTT,
-        ADDRESS, 0, 5, true, conf, sm);
+    final Server server = new RPC.Builder(conf)
+        .setProtocol(TaskUmbilicalProtocol.class).setInstance(mockTT)
+        .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
+        .setSecretManager(sm).build();
 
     server.start();
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Fri Oct 19 02:25:55 2012
@@ -72,8 +72,10 @@ public class MiniMRYarnCluster extends M
   @Override
   public void init(Configuration conf) {
     conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
-    conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
-        "apps_staging_dir/").getAbsolutePath());
+    if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+          "apps_staging_dir/").getAbsolutePath());
+    }
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
 
     try {
@@ -113,10 +115,6 @@ public class MiniMRYarnCluster extends M
     // for corresponding uberized tests.
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
 
-    // Set config for JH Server
-    conf.set(JHAdminConfig.MR_HISTORY_ADDRESS,
-        JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS);
-
     super.init(conf);
   }
 
@@ -128,10 +126,15 @@ public class MiniMRYarnCluster extends M
     @Override
     public synchronized void start() {
       try {
-        getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
-                        MiniYARNCluster.getHostname() + ":0");
-        getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
-                        MiniYARNCluster.getHostname() + ":0");
+        if (!getConfig().getBoolean(
+            JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS,
+            JHAdminConfig.DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS)) {
+          // pick free random ports.
+          getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
+              MiniYARNCluster.getHostname() + ":0");
+          getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
+              MiniYARNCluster.getHostname() + ":0");
+        }
         historyServer = new JobHistoryServer();
         historyServer.init(getConfig());
         new Thread() {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Fri Oct 19 02:25:55 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -26,6 +27,7 @@ import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
+import org.apache.commons.io.FileUtils;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,10 +41,10 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -66,6 +68,7 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -77,15 +80,24 @@ public class TestMRJobs {
   private static final Log LOG = LogFactory.getLog(TestMRJobs.class);
 
   protected static MiniMRYarnCluster mrCluster;
+  protected static MiniDFSCluster dfsCluster;
 
   private static Configuration conf = new Configuration();
   private static FileSystem localFs;
+  private static FileSystem remoteFs;
   static {
     try {
       localFs = FileSystem.getLocal(conf);
     } catch (IOException io) {
       throw new RuntimeException("problem getting local fs", io);
     }
+    try {
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
   }
 
   private static Path TEST_ROOT_DIR = new Path("target",
@@ -104,6 +116,8 @@ public class TestMRJobs {
     if (mrCluster == null) {
       mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
       Configuration conf = new Configuration();
+      conf.set("fs.defaultFS", remoteFs.getUri().toString());   // use HDFS
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
       mrCluster.init(conf);
       mrCluster.start();
     }
@@ -120,6 +134,10 @@ public class TestMRJobs {
       mrCluster.stop();
       mrCluster = null;
     }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
   }
 
   @Test
@@ -211,6 +229,7 @@ public class TestMRJobs {
     Path outputDir =
         new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "random-output");
     FileOutputFormat.setOutputPath(job, outputDir);
+    job.setSpeculativeExecution(false);
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.setJarByClass(RandomTextWriterJob.class);
     job.setMaxMapAttempts(1); // speed up failures
@@ -399,20 +418,20 @@ public class TestMRJobs {
       Configuration conf = context.getConfiguration();
       Path[] files = context.getLocalCacheFiles();
       Path[] archives = context.getLocalCacheArchives();
-      FileSystem fs = LocalFileSystem.get(conf);
 
-      // Check that 3(2+ appjar) files and 2 archives are present
-      Assert.assertEquals(3, files.length);
+      // Check that 4 (2 + appjar + DistrubutedCacheChecker jar) files 
+      // and 2 archives are present
+      Assert.assertEquals(4, files.length);
       Assert.assertEquals(2, archives.length);
 
       // Check lengths of the files
-      Assert.assertEquals(1, fs.getFileStatus(files[0]).getLen());
-      Assert.assertTrue(fs.getFileStatus(files[1]).getLen() > 1);
+      Assert.assertEquals(1, localFs.getFileStatus(files[1]).getLen());
+      Assert.assertTrue(localFs.getFileStatus(files[2]).getLen() > 1);
 
       // Check extraction of the archive
-      Assert.assertTrue(fs.exists(new Path(archives[0],
+      Assert.assertTrue(localFs.exists(new Path(archives[0],
           "distributed.jar.inside3")));
-      Assert.assertTrue(fs.exists(new Path(archives[1],
+      Assert.assertTrue(localFs.exists(new Path(archives[1],
           "distributed.jar.inside4")));
 
       // Check the class loaders
@@ -423,16 +442,27 @@ public class TestMRJobs {
       Assert.assertNotNull(cl.getResource("distributed.jar.inside2"));
       Assert.assertNotNull(cl.getResource("distributed.jar.inside3"));
       Assert.assertNotNull(cl.getResource("distributed.jar.inside4"));
+      // The Job Jar should have been extracted to a folder named "job.jar" and
+      // added to the classpath; the two jar files in the lib folder in the Job
+      // Jar should have also been added to the classpath
+      Assert.assertNotNull(cl.getResource("job.jar/"));
+      Assert.assertNotNull(cl.getResource("job.jar/lib/lib1.jar"));
+      Assert.assertNotNull(cl.getResource("job.jar/lib/lib2.jar"));
 
       // Check that the symlink for the renaming was created in the cwd;
       File symlinkFile = new File("distributed.first.symlink");
       Assert.assertTrue(symlinkFile.exists());
       Assert.assertEquals(1, symlinkFile.length());
+      
+      // Check that the symlink for the Job Jar was created in the cwd and
+      // points to the extracted directory
+      File jobJarDir = new File("job.jar");
+      Assert.assertTrue(FileUtils.isSymlink(jobJarDir));
+      Assert.assertTrue(jobJarDir.isDirectory());
     }
   }
 
-  @Test
-  public void testDistributedCache() throws Exception {
+  public void _testDistributedCache(String jobJarPath) throws Exception {
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
            + " not found. Not running test.");
@@ -450,7 +480,17 @@ public class TestMRJobs {
         makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
 
     Job job = Job.getInstance(mrCluster.getConfig());
-    job.setJarByClass(DistributedCacheChecker.class);
+    
+    // Set the job jar to a new "dummy" jar so we can check that its extracted 
+    // properly
+    job.setJar(jobJarPath);
+    // Because the job jar is a "dummy" jar, we need to include the jar with
+    // DistributedCacheChecker or it won't be able to find it
+    Path distributedCacheCheckerJar = new Path(
+            JarFinder.getJar(DistributedCacheChecker.class));
+    job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
+            localFs.getUri(), distributedCacheCheckerJar.getParent()));
+    
     job.setMapperClass(DistributedCacheChecker.class);
     job.setOutputFormatClass(NullOutputFormat.class);
 
@@ -459,10 +499,11 @@ public class TestMRJobs {
     job.addCacheFile(
         new URI(first.toUri().toString() + "#distributed.first.symlink"));
     job.addFileToClassPath(second);
-    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    // The AppMaster jar itself
+    job.addFileToClassPath(
+            APP_JAR.makeQualified(localFs.getUri(), APP_JAR.getParent())); 
     job.addArchiveToClassPath(third);
     job.addCacheArchive(fourth.toUri());
-    job.createSymlink();
     job.setMaxMapAttempts(1); // speed up failures
 
     job.submit();
@@ -473,6 +514,23 @@ public class TestMRJobs {
                       " but didn't Match Job ID " + jobId ,
           trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
   }
+  
+  @Test
+  public void testDistributedCache() throws Exception {
+    // Test with a local (file:///) Job Jar
+    Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());
+    _testDistributedCache(localJobJarPath.toUri().toString());
+    
+    // Test with a remote (hdfs://) Job Jar
+    Path remoteJobJarPath = new Path(remoteFs.getUri().toString() + "/",
+            localJobJarPath.getName());
+    remoteFs.moveFromLocalFile(localJobJarPath, remoteJobJarPath);
+    File localJobJarFile = new File(localJobJarPath.toUri().toString());
+    if (localJobJarFile.exists()) {     // just to make sure
+        localJobJarFile.delete();
+    }
+    _testDistributedCache(remoteJobJarPath.toUri().toString());
+  }
 
   private Path createTempFile(String filename, String contents)
       throws IOException {
@@ -497,4 +555,45 @@ public class TestMRJobs {
     localFs.setPermission(p, new FsPermission("700"));
     return p;
   }
+  
+  private Path makeJobJarWithLib(String testDir) throws FileNotFoundException, 
+      IOException{
+    Path jobJarPath = new Path(testDir, "thejob.jar");
+    FileOutputStream fos =
+        new FileOutputStream(new File(jobJarPath.toUri().getPath()));
+    JarOutputStream jos = new JarOutputStream(fos);
+    // Have to put in real jar files or it will complain
+    createAndAddJarToJar(jos, new File(
+            new Path(testDir, "lib1.jar").toUri().getPath()));
+    createAndAddJarToJar(jos, new File(
+            new Path(testDir, "lib2.jar").toUri().getPath()));
+    jos.close();
+    localFs.setPermission(jobJarPath, new FsPermission("700"));
+    return jobJarPath;
+  }
+  
+  private void createAndAddJarToJar(JarOutputStream jos, File jarFile) 
+          throws FileNotFoundException, IOException {
+    FileOutputStream fos2 = new FileOutputStream(jarFile);
+    JarOutputStream jos2 = new JarOutputStream(fos2);
+    // Have to have at least one entry or it will complain
+    ZipEntry ze = new ZipEntry("lib1.inside");
+    jos2.putNextEntry(ze);
+    jos2.closeEntry();
+    jos2.close();
+    ze = new ZipEntry("lib/" + jarFile.getName());
+    jos.putNextEntry(ze);
+    FileInputStream in = new FileInputStream(jarFile);
+    byte buf[] = new byte[1024];
+    int numRead;
+    do {
+       numRead = in.read(buf);
+       if (numRead >= 0) {
+           jos.write(buf, 0, numRead);
+       }
+    } while (numRead != -1);
+    in.close();
+    jos.closeEntry();
+    jarFile.delete();
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java Fri Oct 19 02:25:55 2012
@@ -301,7 +301,6 @@ public class TestSpeculativeExecution {
 
     // Creates the Job Configuration
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
-    job.createSymlink();
     job.setMaxMapAttempts(2);
 
     job.submit();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java Fri Oct 19 02:25:55 2012
@@ -177,8 +177,13 @@ public class TestYARNRunner extends Test
   @Test
   public void testResourceMgrDelegate() throws Exception {
     /* we not want a mock of resourcemgr deleagte */
-    ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
-    ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf, clientRMProtocol);
+    final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
+    ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) {
+      @Override
+      public synchronized void start() {
+        this.rmClient = clientRMProtocol;
+      }
+    };
     /* make sure kill calls finish application master */
     when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class)))
     .thenReturn(null);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java Fri Oct 19 02:25:55 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.TestSequ
 import org.apache.hadoop.mapred.TestTextInputFormat;
 import org.apache.hadoop.mapred.ThreadedMapBenchmark;
 import org.apache.hadoop.mapreduce.FailJob;
+import org.apache.hadoop.mapreduce.MiniHadoopClusterManager;
 import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.ProgramDriver;
 
@@ -101,6 +102,8 @@ public class MapredTestDriver {
           "Job History Log analyzer.");
       pgd.addClass(SliveTest.class.getSimpleName(), SliveTest.class, 
           "HDFS Stress Test and Live Data Verification.");
+      pgd.addClass("minicluster", MiniHadoopClusterManager.class,
+      "Single process HDFS and MR cluster.");
     } catch(Throwable e) {
       e.printStackTrace();
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Fri Oct 19 02:25:55 2012
@@ -55,7 +55,10 @@ import org.apache.hadoop.fs.LocalDirAllo
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -84,9 +87,7 @@ import org.jboss.netty.channel.ChannelHa
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.DefaultFileRegion;
 import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.FileRegion;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.group.ChannelGroup;
@@ -101,6 +102,7 @@ import org.jboss.netty.handler.codec.htt
 import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.jboss.netty.handler.ssl.SslHandler;
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
 import org.jboss.netty.util.CharsetUtil;
 
@@ -110,10 +112,27 @@ public class ShuffleHandler extends Abst
     implements AuxServices.AuxiliaryService {
 
   private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
+  
+  public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
+  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+  public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
+  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
 
   private int port;
   private ChannelFactory selector;
   private final ChannelGroup accepted = new DefaultChannelGroup();
+  private HttpPipelineFactory pipelineFact;
+  private int sslFileBufferSize;
+
+  /**
+   * Should the shuffle use posix_fadvise calls to manage the OS cache during
+   * sendfile
+   */
+  private boolean manageOsCache;
+  private int readaheadLength;
+  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+   
 
   public static final String MAPREDUCE_SHUFFLE_SERVICEID =
       "mapreduce.shuffle";
@@ -126,6 +145,11 @@ public class ShuffleHandler extends Abst
   public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
   public static final int DEFAULT_SHUFFLE_PORT = 8080;
 
+  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
+    "mapreduce.shuffle.ssl.file.buffer.size";
+
+  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
+
   @Metrics(about="Shuffle output metrics", context="mapred")
   static class ShuffleMetrics implements ChannelFutureListener {
     @Metric("Shuffle output in bytes")
@@ -231,6 +255,12 @@ public class ShuffleHandler extends Abst
 
   @Override
   public synchronized void init(Configuration conf) {
+    manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
+        DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+    readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
+        DEFAULT_SHUFFLE_READAHEAD_BYTES);
+    
     ThreadFactory bossFactory = new ThreadFactoryBuilder()
       .setNameFormat("ShuffleHandler Netty Boss #%d")
       .build();
@@ -249,7 +279,11 @@ public class ShuffleHandler extends Abst
   public synchronized void start() {
     Configuration conf = getConfig();
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf);
+    try {
+      pipelineFact = new HttpPipelineFactory(conf);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
     bootstrap.setPipelineFactory(pipelineFact);
     port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
     Channel ch = bootstrap.bind(new InetSocketAddress(port));
@@ -259,6 +293,9 @@ public class ShuffleHandler extends Abst
     pipelineFact.SHUFFLE.setPort(port);
     LOG.info(getName() + " listening on port " + port);
     super.start();
+
+    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
   }
 
   @Override
@@ -266,6 +303,7 @@ public class ShuffleHandler extends Abst
     accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
     bootstrap.releaseExternalResources();
+    pipelineFact.destroy();
     super.stop();
   }
 
@@ -283,22 +321,38 @@ public class ShuffleHandler extends Abst
   class HttpPipelineFactory implements ChannelPipelineFactory {
 
     final Shuffle SHUFFLE;
+    private SSLFactory sslFactory;
 
-    public HttpPipelineFactory(Configuration conf) {
+    public HttpPipelineFactory(Configuration conf) throws Exception {
       SHUFFLE = new Shuffle(conf);
+      if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+                          MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
+        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+        sslFactory.init();
+      }
+    }
+
+    public void destroy() {
+      if (sslFactory != null) {
+        sslFactory.destroy();
+      }
     }
 
     @Override
     public ChannelPipeline getPipeline() throws Exception {
-        return Channels.pipeline(
-            new HttpRequestDecoder(),
-            new HttpChunkAggregator(1 << 16),
-            new HttpResponseEncoder(),
-            new ChunkedWriteHandler(),
-            SHUFFLE);
-        // TODO factor security manager into pipeline
-        // TODO factor out encode/decode to permit binary shuffle
-        // TODO factor out decode of index to permit alt. models
+      ChannelPipeline pipeline = Channels.pipeline();
+      if (sslFactory != null) {
+        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+      }
+      pipeline.addLast("decoder", new HttpRequestDecoder());
+      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+      pipeline.addLast("encoder", new HttpResponseEncoder());
+      pipeline.addLast("chunking", new ChunkedWriteHandler());
+      pipeline.addLast("shuffle", SHUFFLE);
+      return pipeline;
+      // TODO factor security manager into pipeline
+      // TODO factor out encode/decode to permit binary shuffle
+      // TODO factor out decode of index to permit alt. models
     }
 
   }
@@ -468,14 +522,14 @@ public class ShuffleHandler extends Abst
           base + "/file.out", conf);
       LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : " +
           indexFileName);
-      IndexRecord info = 
+      final IndexRecord info = 
         indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
       final ShuffleHeader header =
         new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
       final DataOutputBuffer dob = new DataOutputBuffer();
       header.write(dob);
       ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-      File spillfile = new File(mapOutputFileName.toString());
+      final File spillfile = new File(mapOutputFileName.toString());
       RandomAccessFile spill;
       try {
         spill = new RandomAccessFile(spillfile, "r");
@@ -483,17 +537,28 @@ public class ShuffleHandler extends Abst
         LOG.info(spillfile + " not found");
         return null;
       }
-      final FileRegion partition = new DefaultFileRegion(
-          spill.getChannel(), info.startOffset, info.partLength);
-      ChannelFuture writeFuture = ch.write(partition);
-      writeFuture.addListener(new ChannelFutureListener() {
-          // TODO error handling; distinguish IO/connection failures,
-          //      attribute to appropriate spill output
+      ChannelFuture writeFuture;
+      if (ch.getPipeline().get(SslHandler.class) == null) {
+        final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
+            info.startOffset, info.partLength, manageOsCache, readaheadLength,
+            readaheadPool, spillfile.getAbsolutePath());
+        writeFuture = ch.write(partition);
+        writeFuture.addListener(new ChannelFutureListener() {
+            // TODO error handling; distinguish IO/connection failures,
+            //      attribute to appropriate spill output
           @Override
           public void operationComplete(ChannelFuture future) {
             partition.releaseExternalResources();
           }
         });
+      } else {
+        // HTTPS cannot be done with zero copy.
+        final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+            info.startOffset, info.partLength, sslFileBufferSize,
+            manageOsCache, readaheadLength, readaheadPool,
+            spillfile.getAbsolutePath());
+        writeFuture = ch.write(chunk);
+      }
       metrics.shuffleConnections.incr();
       metrics.shuffleOutputBytes.incr(info.partLength); // optimistic
       return writeFuture;

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml Fri Oct 19 02:25:55 2012
@@ -133,7 +133,31 @@
       <groupId>org.jboss.netty</groupId>
       <artifactId>netty</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
  
   <build>
@@ -148,6 +172,18 @@
           <effort>Max</effort>
        </configuration>
      </plugin>
+     <plugin>
+       <groupId>org.apache.maven.plugins</groupId>
+       <artifactId>maven-surefire-plugin</artifactId>
+       <configuration>
+         <properties>
+           <property>
+             <name>listener</name>
+             <value>org.apache.hadoop.test.TimedOutTestsListener</value>
+           </property>
+         </properties>
+       </configuration>
+     </plugin>
     </plugins>
   </build>
  

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml Fri Oct 19 02:25:55 2012
@@ -36,6 +36,14 @@
 
   <dependencies>
     <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
       <scope>provided</scope>
@@ -88,17 +96,12 @@
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-hs</artifactId>
-       <scope>provided</scope>
-     </dependency>
-     <dependency>
-       <groupId>org.apache.hadoop</groupId>
-       <artifactId>hadoop-mapreduce-client-hs</artifactId>
        <scope>test</scope>
      </dependency>
      <dependency>
        <groupId>org.hsqldb</groupId>
        <artifactId>hsqldb</artifactId>
-       <version>2.0.0</version>
+       <scope>provided</scope>
      </dependency>
   </dependencies>
   

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/DBCountPageView.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/DBCountPageView.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/DBCountPageView.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/DBCountPageView.java Fri Oct 19 02:25:55 2012
@@ -27,7 +27,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Iterator;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -65,6 +64,16 @@ import org.hsqldb.server.Server;
  * 
  * When called with no arguments the program starts a local HSQLDB server, and 
  * uses this database for storing/retrieving the data. 
+ * <br>
+ * This program requires some additional configuration relating to HSQLDB.  
+ * The the hsqldb jar should be added to the classpath:
+ * <br>
+ * <code>export HADOOP_CLASSPATH=share/hadoop/mapreduce/lib-examples/hsqldb-2.0.0.jar</code>
+ * <br>
+ * And the hsqldb jar should be included with the <code>-libjars</code> 
+ * argument when executing it with hadoop:
+ * <br>
+ * <code>-libjars share/hadoop/mapreduce/lib-examples/hsqldb-2.0.0.jar</code>
  */
 public class DBCountPageView extends Configured implements Tool {
 
@@ -72,6 +81,7 @@ public class DBCountPageView extends Con
   
   private Connection connection;
   private boolean initialized = false;
+  private boolean isOracle = false;
 
   private static final String[] AccessFieldNames = {"url", "referrer", "time"};
   private static final String[] PageviewFieldNames = {"url", "pageview"};
@@ -92,7 +102,9 @@ public class DBCountPageView extends Con
   
   private void createConnection(String driverClassName
       , String url) throws Exception {
-    
+    if(driverClassName.toLowerCase().contains("oracle")) {
+      isOracle = true;
+    }
     Class.forName(driverClassName);
     connection = DriverManager.getConnection(url);
     connection.setAutoCommit(false);
@@ -132,7 +144,7 @@ public class DBCountPageView extends Con
   }
   
   private void dropTables() {
-    String dropAccess = "DROP TABLE Access";
+    String dropAccess = "DROP TABLE HAccess";
     String dropPageview = "DROP TABLE Pageview";
     Statement st = null;
     try {
@@ -147,18 +159,21 @@ public class DBCountPageView extends Con
   }
   
   private void createTables() throws SQLException {
-
+	String dataType = "BIGINT NOT NULL";
+	if(isOracle) {
+	  dataType = "NUMBER(19) NOT NULL";
+	}
     String createAccess = 
       "CREATE TABLE " +
-      "Access(url      VARCHAR(100) NOT NULL," +
+      "HAccess(url      VARCHAR(100) NOT NULL," +
             " referrer VARCHAR(100)," +
-            " time     BIGINT NOT NULL, " +
+            " time     " + dataType + ", " +
             " PRIMARY KEY (url, time))";
 
     String createPageview = 
       "CREATE TABLE " +
       "Pageview(url      VARCHAR(100) NOT NULL," +
-              " pageview     BIGINT NOT NULL, " +
+              " pageview     " + dataType + ", " +
                " PRIMARY KEY (url))";
     
     Statement st = connection.createStatement();
@@ -179,7 +194,7 @@ public class DBCountPageView extends Con
     PreparedStatement statement = null ;
     try {
       statement = connection.prepareStatement(
-          "INSERT INTO Access(url, referrer, time)" +
+          "INSERT INTO HAccess(url, referrer, time)" +
           " VALUES (?, ?, ?)");
 
       Random random = new Random();
@@ -238,7 +253,7 @@ public class DBCountPageView extends Con
   /**Verifies the results are correct */
   private boolean verify() throws SQLException {
     //check total num pageview
-    String countAccessQuery = "SELECT COUNT(*) FROM Access";
+    String countAccessQuery = "SELECT COUNT(*) FROM HAccess";
     String sumPageviewQuery = "SELECT SUM(pageview) FROM Pageview";
     Statement st = null;
     ResultSet rs = null;
@@ -386,7 +401,7 @@ public class DBCountPageView extends Con
 
     DBConfiguration.configureDB(conf, driverClassName, url);
 
-    Job job = new Job(conf);
+    Job job = Job.getInstance(conf);
         
     job.setJobName("Count Pageviews of URLs");
     job.setJarByClass(DBCountPageView.class);
@@ -394,7 +409,7 @@ public class DBCountPageView extends Con
     job.setCombinerClass(LongSumReducer.class);
     job.setReducerClass(PageviewReducer.class);
 
-    DBInputFormat.setInput(job, AccessRecord.class, "Access"
+    DBInputFormat.setInput(job, AccessRecord.class, "HAccess"
         , null, "url", AccessFieldNames);
 
     DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/SecondarySort.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/SecondarySort.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/SecondarySort.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/SecondarySort.java Fri Oct 19 02:25:55 2012
@@ -211,7 +211,7 @@ public class SecondarySort {
     Configuration conf = new Configuration();
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     if (otherArgs.length != 2) {
-      System.err.println("Usage: secondarysrot <in> <out>");
+      System.err.println("Usage: secondarysort <in> <out>");
       System.exit(2);
     }
     Job job = new Job(conf, "secondary sort");

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/Sort.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/Sort.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/Sort.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/Sort.java Fri Oct 19 02:25:55 2012
@@ -167,7 +167,6 @@ public class Sort<K,V> extends Configure
       URI partitionUri = new URI(partitionFile.toString() +
                                  "#" + "_sortPartitioning");
       DistributedCache.addCacheFile(partitionUri, conf);
-      DistributedCache.createSymlink(conf);
     }
 
     System.out.println("Running on " +

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMean.java Fri Oct 19 02:25:55 2012
@@ -1,196 +1,196 @@
-package org.apache.hadoop.examples;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-public class WordMean extends Configured implements Tool {
-
-  private double mean = 0;
-
-  private final static Text COUNT = new Text("count");
-  private final static Text LENGTH = new Text("length");
-  private final static LongWritable ONE = new LongWritable(1);
-
-  /**
-   * Maps words from line of text into 2 key-value pairs; one key-value pair for
-   * counting the word, another for counting its length.
-   */
-  public static class WordMeanMapper extends
-      Mapper<Object, Text, Text, LongWritable> {
-
-    private LongWritable wordLen = new LongWritable();
-
-    /**
-     * Emits 2 key-value pairs for counting the word and its length. Outputs are
-     * (Text, LongWritable).
-     * 
-     * @param value
-     *          This will be a line of text coming in from our input file.
-     */
-    public void map(Object key, Text value, Context context)
-        throws IOException, InterruptedException {
-      StringTokenizer itr = new StringTokenizer(value.toString());
-      while (itr.hasMoreTokens()) {
-        String string = itr.nextToken();
-        this.wordLen.set(string.length());
-        context.write(LENGTH, this.wordLen);
-        context.write(COUNT, ONE);
-      }
-    }
-  }
-
-  /**
-   * Performs integer summation of all the values for each key.
-   */
-  public static class WordMeanReducer extends
-      Reducer<Text, LongWritable, Text, LongWritable> {
-
-    private LongWritable sum = new LongWritable();
-
-    /**
-     * Sums all the individual values within the iterator and writes them to the
-     * same key.
-     * 
-     * @param key
-     *          This will be one of 2 constants: LENGTH_STR or COUNT_STR.
-     * @param values
-     *          This will be an iterator of all the values associated with that
-     *          key.
-     */
-    public void reduce(Text key, Iterable<LongWritable> values, Context context)
-        throws IOException, InterruptedException {
-
-      int theSum = 0;
-      for (LongWritable val : values) {
-        theSum += val.get();
-      }
-      sum.set(theSum);
-      context.write(key, sum);
-    }
-  }
-
-  /**
-   * Reads the output file and parses the summation of lengths, and the word
-   * count, to perform a quick calculation of the mean.
-   * 
-   * @param path
-   *          The path to find the output file in. Set in main to the output
-   *          directory.
-   * @throws IOException
-   *           If it cannot access the output directory, we throw an exception.
-   */
-  private double readAndCalcMean(Path path, Configuration conf)
-      throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    Path file = new Path(path, "part-r-00000");
-
-    if (!fs.exists(file))
-      throw new IOException("Output not found!");
-
-    BufferedReader br = null;
-
-    // average = total sum / number of elements;
-    try {
-      br = new BufferedReader(new InputStreamReader(fs.open(file)));
-
-      long count = 0;
-      long length = 0;
-
-      String line;
-      while ((line = br.readLine()) != null) {
-        StringTokenizer st = new StringTokenizer(line);
-
-        // grab type
-        String type = st.nextToken();
-
-        // differentiate
-        if (type.equals(COUNT.toString())) {
-          String countLit = st.nextToken();
-          count = Long.parseLong(countLit);
-        } else if (type.equals(LENGTH.toString())) {
-          String lengthLit = st.nextToken();
-          length = Long.parseLong(lengthLit);
-        }
-      }
-
-      double theMean = (((double) length) / ((double) count));
-      System.out.println("The mean is: " + theMean);
-      return theMean;
-    } finally {
-      br.close();
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    ToolRunner.run(new Configuration(), new WordMean(), args);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: wordmean <in> <out>");
-      return 0;
-    }
-
-    Configuration conf = getConf();
-
-    @SuppressWarnings("deprecation")
-    Job job = new Job(conf, "word mean");
-    job.setJarByClass(WordMean.class);
-    job.setMapperClass(WordMeanMapper.class);
-    job.setCombinerClass(WordMeanReducer.class);
-    job.setReducerClass(WordMeanReducer.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(LongWritable.class);
-    FileInputFormat.addInputPath(job, new Path(args[0]));
-    Path outputpath = new Path(args[1]);
-    FileOutputFormat.setOutputPath(job, outputpath);
-    boolean result = job.waitForCompletion(true);
-    mean = readAndCalcMean(outputpath, conf);
-
-    return (result ? 0 : 1);
-  }
-
-  /**
-   * Only valuable after run() called.
-   * 
-   * @return Returns the mean value.
-   */
-  public double getMean() {
-    return mean;
-  }
+package org.apache.hadoop.examples;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class WordMean extends Configured implements Tool {
+
+  private double mean = 0;
+
+  private final static Text COUNT = new Text("count");
+  private final static Text LENGTH = new Text("length");
+  private final static LongWritable ONE = new LongWritable(1);
+
+  /**
+   * Maps words from line of text into 2 key-value pairs; one key-value pair for
+   * counting the word, another for counting its length.
+   */
+  public static class WordMeanMapper extends
+      Mapper<Object, Text, Text, LongWritable> {
+
+    private LongWritable wordLen = new LongWritable();
+
+    /**
+     * Emits 2 key-value pairs for counting the word and its length. Outputs are
+     * (Text, LongWritable).
+     * 
+     * @param value
+     *          This will be a line of text coming in from our input file.
+     */
+    public void map(Object key, Text value, Context context)
+        throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        String string = itr.nextToken();
+        this.wordLen.set(string.length());
+        context.write(LENGTH, this.wordLen);
+        context.write(COUNT, ONE);
+      }
+    }
+  }
+
+  /**
+   * Performs integer summation of all the values for each key.
+   */
+  public static class WordMeanReducer extends
+      Reducer<Text, LongWritable, Text, LongWritable> {
+
+    private LongWritable sum = new LongWritable();
+
+    /**
+     * Sums all the individual values within the iterator and writes them to the
+     * same key.
+     * 
+     * @param key
+     *          This will be one of 2 constants: LENGTH_STR or COUNT_STR.
+     * @param values
+     *          This will be an iterator of all the values associated with that
+     *          key.
+     */
+    public void reduce(Text key, Iterable<LongWritable> values, Context context)
+        throws IOException, InterruptedException {
+
+      int theSum = 0;
+      for (LongWritable val : values) {
+        theSum += val.get();
+      }
+      sum.set(theSum);
+      context.write(key, sum);
+    }
+  }
+
+  /**
+   * Reads the output file and parses the summation of lengths, and the word
+   * count, to perform a quick calculation of the mean.
+   * 
+   * @param path
+   *          The path to find the output file in. Set in main to the output
+   *          directory.
+   * @throws IOException
+   *           If it cannot access the output directory, we throw an exception.
+   */
+  private double readAndCalcMean(Path path, Configuration conf)
+      throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path file = new Path(path, "part-r-00000");
+
+    if (!fs.exists(file))
+      throw new IOException("Output not found!");
+
+    BufferedReader br = null;
+
+    // average = total sum / number of elements;
+    try {
+      br = new BufferedReader(new InputStreamReader(fs.open(file)));
+
+      long count = 0;
+      long length = 0;
+
+      String line;
+      while ((line = br.readLine()) != null) {
+        StringTokenizer st = new StringTokenizer(line);
+
+        // grab type
+        String type = st.nextToken();
+
+        // differentiate
+        if (type.equals(COUNT.toString())) {
+          String countLit = st.nextToken();
+          count = Long.parseLong(countLit);
+        } else if (type.equals(LENGTH.toString())) {
+          String lengthLit = st.nextToken();
+          length = Long.parseLong(lengthLit);
+        }
+      }
+
+      double theMean = (((double) length) / ((double) count));
+      System.out.println("The mean is: " + theMean);
+      return theMean;
+    } finally {
+      br.close();
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new WordMean(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length != 2) {
+      System.err.println("Usage: wordmean <in> <out>");
+      return 0;
+    }
+
+    Configuration conf = getConf();
+
+    @SuppressWarnings("deprecation")
+    Job job = new Job(conf, "word mean");
+    job.setJarByClass(WordMean.class);
+    job.setMapperClass(WordMeanMapper.class);
+    job.setCombinerClass(WordMeanReducer.class);
+    job.setReducerClass(WordMeanReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(LongWritable.class);
+    FileInputFormat.addInputPath(job, new Path(args[0]));
+    Path outputpath = new Path(args[1]);
+    FileOutputFormat.setOutputPath(job, outputpath);
+    boolean result = job.waitForCompletion(true);
+    mean = readAndCalcMean(outputpath, conf);
+
+    return (result ? 0 : 1);
+  }
+
+  /**
+   * Only valuable after run() called.
+   * 
+   * @return Returns the mean value.
+   */
+  public double getMean() {
+    return mean;
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordMedian.java Fri Oct 19 02:25:55 2012
@@ -1,208 +1,208 @@
-package org.apache.hadoop.examples;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-public class WordMedian extends Configured implements Tool {
-
-  private double median = 0;
-  private final static IntWritable ONE = new IntWritable(1);
-
-  /**
-   * Maps words from line of text into a key-value pair; the length of the word
-   * as the key, and 1 as the value.
-   */
-  public static class WordMedianMapper extends
-      Mapper<Object, Text, IntWritable, IntWritable> {
-
-    private IntWritable length = new IntWritable();
-
-    /**
-     * Emits a key-value pair for counting the word. Outputs are (IntWritable,
-     * IntWritable).
-     * 
-     * @param value
-     *          This will be a line of text coming in from our input file.
-     */
-    public void map(Object key, Text value, Context context)
-        throws IOException, InterruptedException {
-      StringTokenizer itr = new StringTokenizer(value.toString());
-      while (itr.hasMoreTokens()) {
-        String string = itr.nextToken();
-        length.set(string.length());
-        context.write(length, ONE);
-      }
-    }
-  }
-
-  /**
-   * Performs integer summation of all the values for each key.
-   */
-  public static class WordMedianReducer extends
-      Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
-
-    private IntWritable val = new IntWritable();
-
-    /**
-     * Sums all the individual values within the iterator and writes them to the
-     * same key.
-     * 
-     * @param key
-     *          This will be a length of a word that was read.
-     * @param values
-     *          This will be an iterator of all the values associated with that
-     *          key.
-     */
-    public void reduce(IntWritable key, Iterable<IntWritable> values,
-        Context context) throws IOException, InterruptedException {
-
-      int sum = 0;
-      for (IntWritable value : values) {
-        sum += value.get();
-      }
-      val.set(sum);
-      context.write(key, val);
-    }
-  }
-
-  /**
-   * This is a standard program to read and find a median value based on a file
-   * of word counts such as: 1 456, 2 132, 3 56... Where the first values are
-   * the word lengths and the following values are the number of times that
-   * words of that length appear.
-   * 
-   * @param path
-   *          The path to read the HDFS file from (part-r-00000...00001...etc).
-   * @param medianIndex1
-   *          The first length value to look for.
-   * @param medianIndex2
-   *          The second length value to look for (will be the same as the first
-   *          if there are an even number of words total).
-   * @throws IOException
-   *           If file cannot be found, we throw an exception.
-   * */
-  private double readAndFindMedian(String path, int medianIndex1,
-      int medianIndex2, Configuration conf) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    Path file = new Path(path, "part-r-00000");
-
-    if (!fs.exists(file))
-      throw new IOException("Output not found!");
-
-    BufferedReader br = null;
-
-    try {
-      br = new BufferedReader(new InputStreamReader(fs.open(file)));
-      int num = 0;
-
-      String line;
-      while ((line = br.readLine()) != null) {
-        StringTokenizer st = new StringTokenizer(line);
-
-        // grab length
-        String currLen = st.nextToken();
-
-        // grab count
-        String lengthFreq = st.nextToken();
-
-        int prevNum = num;
-        num += Integer.parseInt(lengthFreq);
-
-        if (medianIndex2 >= prevNum && medianIndex1 <= num) {
-          System.out.println("The median is: " + currLen);
-          br.close();
-          return Double.parseDouble(currLen);
-        } else if (medianIndex2 >= prevNum && medianIndex1 < num) {
-          String nextCurrLen = st.nextToken();
-          double theMedian = (Integer.parseInt(currLen) + Integer
-              .parseInt(nextCurrLen)) / 2.0;
-          System.out.println("The median is: " + theMedian);
-          br.close();
-          return theMedian;
-        }
-      }
-    } finally {
-      br.close();
-    }
-    // error, no median found
-    return -1;
-  }
-
-  public static void main(String[] args) throws Exception {
-    ToolRunner.run(new Configuration(), new WordMedian(), args);
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: wordmedian <in> <out>");
-      return 0;
-    }
-
-    setConf(new Configuration());
-    Configuration conf = getConf();
-
-    @SuppressWarnings("deprecation")
-    Job job = new Job(conf, "word median");
-    job.setJarByClass(WordMedian.class);
-    job.setMapperClass(WordMedianMapper.class);
-    job.setCombinerClass(WordMedianReducer.class);
-    job.setReducerClass(WordMedianReducer.class);
-    job.setOutputKeyClass(IntWritable.class);
-    job.setOutputValueClass(IntWritable.class);
-    FileInputFormat.addInputPath(job, new Path(args[0]));
-    FileOutputFormat.setOutputPath(job, new Path(args[1]));
-    boolean result = job.waitForCompletion(true);
-
-    // Wait for JOB 1 -- get middle value to check for Median
-
-    long totalWords = job.getCounters()
-        .getGroup(TaskCounter.class.getCanonicalName())
-        .findCounter("MAP_OUTPUT_RECORDS", "Map output records").getValue();
-    int medianIndex1 = (int) Math.ceil((totalWords / 2.0));
-    int medianIndex2 = (int) Math.floor((totalWords / 2.0));
-
-    median = readAndFindMedian(args[1], medianIndex1, medianIndex2, conf);
-
-    return (result ? 0 : 1);
-  }
-
-  public double getMedian() {
-    return median;
-  }
-}
+package org.apache.hadoop.examples;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class WordMedian extends Configured implements Tool {
+
+  private double median = 0;
+  private final static IntWritable ONE = new IntWritable(1);
+
+  /**
+   * Maps words from line of text into a key-value pair; the length of the word
+   * as the key, and 1 as the value.
+   */
+  public static class WordMedianMapper extends
+      Mapper<Object, Text, IntWritable, IntWritable> {
+
+    private IntWritable length = new IntWritable();
+
+    /**
+     * Emits a key-value pair for counting the word. Outputs are (IntWritable,
+     * IntWritable).
+     * 
+     * @param value
+     *          This will be a line of text coming in from our input file.
+     */
+    public void map(Object key, Text value, Context context)
+        throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        String string = itr.nextToken();
+        length.set(string.length());
+        context.write(length, ONE);
+      }
+    }
+  }
+
+  /**
+   * Performs integer summation of all the values for each key.
+   */
+  public static class WordMedianReducer extends
+      Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+
+    private IntWritable val = new IntWritable();
+
+    /**
+     * Sums all the individual values within the iterator and writes them to the
+     * same key.
+     * 
+     * @param key
+     *          This will be a length of a word that was read.
+     * @param values
+     *          This will be an iterator of all the values associated with that
+     *          key.
+     */
+    public void reduce(IntWritable key, Iterable<IntWritable> values,
+        Context context) throws IOException, InterruptedException {
+
+      int sum = 0;
+      for (IntWritable value : values) {
+        sum += value.get();
+      }
+      val.set(sum);
+      context.write(key, val);
+    }
+  }
+
+  /**
+   * This is a standard program to read and find a median value based on a file
+   * of word counts such as: 1 456, 2 132, 3 56... Where the first values are
+   * the word lengths and the following values are the number of times that
+   * words of that length appear.
+   * 
+   * @param path
+   *          The path to read the HDFS file from (part-r-00000...00001...etc).
+   * @param medianIndex1
+   *          The first length value to look for.
+   * @param medianIndex2
+   *          The second length value to look for (will be the same as the first
+   *          if there are an even number of words total).
+   * @throws IOException
+   *           If file cannot be found, we throw an exception.
+   * */
+  private double readAndFindMedian(String path, int medianIndex1,
+      int medianIndex2, Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path file = new Path(path, "part-r-00000");
+
+    if (!fs.exists(file))
+      throw new IOException("Output not found!");
+
+    BufferedReader br = null;
+
+    try {
+      br = new BufferedReader(new InputStreamReader(fs.open(file)));
+      int num = 0;
+
+      String line;
+      while ((line = br.readLine()) != null) {
+        StringTokenizer st = new StringTokenizer(line);
+
+        // grab length
+        String currLen = st.nextToken();
+
+        // grab count
+        String lengthFreq = st.nextToken();
+
+        int prevNum = num;
+        num += Integer.parseInt(lengthFreq);
+
+        if (medianIndex2 >= prevNum && medianIndex1 <= num) {
+          System.out.println("The median is: " + currLen);
+          br.close();
+          return Double.parseDouble(currLen);
+        } else if (medianIndex2 >= prevNum && medianIndex1 < num) {
+          String nextCurrLen = st.nextToken();
+          double theMedian = (Integer.parseInt(currLen) + Integer
+              .parseInt(nextCurrLen)) / 2.0;
+          System.out.println("The median is: " + theMedian);
+          br.close();
+          return theMedian;
+        }
+      }
+    } finally {
+      br.close();
+    }
+    // error, no median found
+    return -1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new WordMedian(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length != 2) {
+      System.err.println("Usage: wordmedian <in> <out>");
+      return 0;
+    }
+
+    setConf(new Configuration());
+    Configuration conf = getConf();
+
+    @SuppressWarnings("deprecation")
+    Job job = new Job(conf, "word median");
+    job.setJarByClass(WordMedian.class);
+    job.setMapperClass(WordMedianMapper.class);
+    job.setCombinerClass(WordMedianReducer.class);
+    job.setReducerClass(WordMedianReducer.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(IntWritable.class);
+    FileInputFormat.addInputPath(job, new Path(args[0]));
+    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    boolean result = job.waitForCompletion(true);
+
+    // Wait for JOB 1 -- get middle value to check for Median
+
+    long totalWords = job.getCounters()
+        .getGroup(TaskCounter.class.getCanonicalName())
+        .findCounter("MAP_OUTPUT_RECORDS", "Map output records").getValue();
+    int medianIndex1 = (int) Math.ceil((totalWords / 2.0));
+    int medianIndex2 = (int) Math.floor((totalWords / 2.0));
+
+    median = readAndFindMedian(args[1], medianIndex1, medianIndex2, conf);
+
+    return (result ? 0 : 1);
+  }
+
+  public double getMedian() {
+    return median;
+  }
+}



Mime
View raw message