hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1495297 [28/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...
Date Fri, 21 Jun 2013 06:37:39 GMT
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java Fri Jun 21 06:37:27 2013
@@ -20,7 +20,9 @@ package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.HashMap;
 import java.util.Set;
@@ -33,6 +35,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
 
@@ -74,7 +79,7 @@ public abstract class CombineFileInputFo
   private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
 
   // mapping from a rack name to the set of Nodes in the rack 
-  private static HashMap<String, Set<String>> rackToNodes = 
+  private HashMap<String, Set<String>> rackToNodes = 
                             new HashMap<String, Set<String>>();
   /**
    * Specify the maximum size (in bytes) of each split. Each split is
@@ -127,6 +132,16 @@ public abstract class CombineFileInputFo
     pools.add(multi);
   }
 
+  @Override
+  protected boolean isSplitable(FileSystem fs, Path file) {
+    final CompressionCodec codec =
+      new CompressionCodecFactory(fs.getConf()).getCodec(file);
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
+  }
+  
   /**
    * default constructor
    */
@@ -180,24 +195,31 @@ public abstract class CombineFileInputFo
     if (paths.length == 0) {
       return splits.toArray(new CombineFileSplit[splits.size()]);    
     }
+    
+    // Convert them to Paths first. This is a costly operation and 
+    // we should do it first, otherwise we will incur doing it multiple
+    // times, one time each for each pool in the next loop.
+    List<Path> newpaths = new LinkedList<Path>();
+    for (int i = 0; i < paths.length; i++) {
+      FileSystem fs = paths[i].getFileSystem(job);
+      Path p = fs.makeQualified(paths[i]);
+      newpaths.add(p);
+    }
+    paths = null;
 
     // In one single iteration, process all the paths in a single pool.
-    // Processing one pool at a time ensures that a split contans paths
+    // Processing one pool at a time ensures that a split contains paths
     // from a single pool only.
     for (MultiPathFilter onepool : pools) {
       ArrayList<Path> myPaths = new ArrayList<Path>();
       
       // pick one input path. If it matches all the filters in a pool,
       // add it to the output set
-      for (int i = 0; i < paths.length; i++) {
-        if (paths[i] == null) {  // already processed
-          continue;
-        }
-        FileSystem fs = paths[i].getFileSystem(job);
-        Path p = fs.makeQualified(paths[i]);
+      for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
+        Path p = iter.next();
         if (onepool.accept(p)) {
-          myPaths.add(paths[i]); // add it to my output set
-          paths[i] = null;       // already processed
+          myPaths.add(p); // add it to my output set
+          iter.remove();
         }
       }
       // create splits for all files in this pool.
@@ -205,16 +227,8 @@ public abstract class CombineFileInputFo
                     maxSize, minSizeNode, minSizeRack, splits);
     }
 
-    // Finally, process all paths that do not belong to any pool.
-    ArrayList<Path> myPaths = new ArrayList<Path>();
-    for (int i = 0; i < paths.length; i++) {
-      if (paths[i] == null) {  // already processed
-        continue;
-      }
-      myPaths.add(paths[i]);
-    }
     // create splits for all files that are not in any pool.
-    getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
+    getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]), 
                   maxSize, minSizeNode, minSizeRack, splits);
 
     // free up rackToNodes map
@@ -253,13 +267,15 @@ public abstract class CombineFileInputFo
     // populate all the blocks for all files
     long totLength = 0;
     for (int i = 0; i < paths.length; i++) {
-      files[i] = new OneFileInfo(paths[i], job, 
-                                 rackToBlocks, blockToNodes, nodeToBlocks);
+      files[i] = new OneFileInfo(paths[i], job,
+                                  isSplitable(paths[i].getFileSystem(job), paths[i]),
+                                 rackToBlocks, blockToNodes, nodeToBlocks,
+                                 rackToNodes, maxSize);
       totLength += files[i].getLength();
     }
 
     ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
-    ArrayList<String> nodes = new ArrayList<String>();
+    Set<String> nodes = new HashSet<String>();
     long curSplitSize = 0;
 
     // process all nodes and create splits that are local
@@ -312,7 +328,7 @@ public abstract class CombineFileInputFo
     // in 'overflow'. After the processing of all racks is complete, these overflow
     // blocks will be combined into splits.
     ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
-    ArrayList<String> racks = new ArrayList<String>();
+    Set<String> racks = new HashSet<String>();
 
     // Process all racks over and over again until there is no more work to do.
     while (blockToNodes.size() > 0) {
@@ -417,7 +433,7 @@ public abstract class CombineFileInputFo
    */
   private void addCreatedSplit(JobConf job,
                                List<CombineFileSplit> splitList, 
-                               List<String> locations, 
+                               Collection<String> locations, 
                                ArrayList<OneBlockInfo> validBlocks) {
     // create an input split
     Path[] fl = new Path[validBlocks.size()];
@@ -450,9 +466,12 @@ public abstract class CombineFileInputFo
     private OneBlockInfo[] blocks;       // all blocks in this file
 
     OneFileInfo(Path path, JobConf job,
+                boolean isSplitable,
                 HashMap<String, List<OneBlockInfo>> rackToBlocks,
                 HashMap<OneBlockInfo, String[]> blockToNodes,
-                HashMap<String, List<OneBlockInfo>> nodeToBlocks)
+                HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                HashMap<String, Set<String>> rackToNodes,
+                long maxSize)
                 throws IOException {
       this.fileSize = 0;
 
@@ -465,32 +484,77 @@ public abstract class CombineFileInputFo
       if (locations == null) {
         blocks = new OneBlockInfo[0];
       } else {
-        blocks = new OneBlockInfo[locations.length];
-        for (int i = 0; i < locations.length; i++) {
-           
-          fileSize += locations[i].getLength();
-          OneBlockInfo oneblock =  new OneBlockInfo(path, 
-                                       locations[i].getOffset(), 
-                                       locations[i].getLength(),
-                                       locations[i].getHosts(),
-                                       locations[i].getTopologyPaths()); 
-          blocks[i] = oneblock;
+        if (!isSplitable) {
+          // if the file is not splitable, just create the one block with
+          // full file length
+          blocks = new OneBlockInfo[1];
+          fileSize = stat.getLen();
+          blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
+              .getHosts(), locations[0].getTopologyPaths());
+        } else {
+          ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
+              locations.length);
+          for (int i = 0; i < locations.length; i++) {
+            fileSize += locations[i].getLength();
+
+            // each split can be a maximum of maxSize
+            long left = locations[i].getLength();
+            long myOffset = locations[i].getOffset();
+            long myLength = 0;
+            while (left > 0) {
+              if (maxSize == 0) {
+                myLength = left;
+              } else {
+                if (left > maxSize && left < 2 * maxSize) {
+                  // if remainder is between max and 2*max - then
+                  // instead of creating splits of size max, left-max we
+                  // create splits of size left/2 and left/2. This is
+                  // a heuristic to avoid creating really really small
+                  // splits.
+                  myLength = left / 2;
+                } else {
+                  myLength = Math.min(maxSize, left);
+                }
+              }
+              OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
+                  myLength, locations[i].getHosts(), locations[i]
+                      .getTopologyPaths());
+              left -= myLength;
+              myOffset += myLength;
 
+              blocksList.add(oneblock);
+            }
+          }
+          blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
+        }
+
+        for (OneBlockInfo oneblock : blocks) {
           // add this block to the block --> node locations map
           blockToNodes.put(oneblock, oneblock.hosts);
 
+          // For blocks that do not have host/rack information,
+          // assign to default  rack.
+          String[] racks = null;
+          if (oneblock.hosts.length == 0) {
+            racks = new String[]{NetworkTopology.DEFAULT_RACK};
+          } else {
+            racks = oneblock.racks;
+          }
+
           // add this block to the rack --> block map
-          for (int j = 0; j < oneblock.racks.length; j++) {
-            String rack = oneblock.racks[j];
+          for (int j = 0; j < racks.length; j++) {
+            String rack = racks[j];
             List<OneBlockInfo> blklist = rackToBlocks.get(rack);
             if (blklist == null) {
               blklist = new ArrayList<OneBlockInfo>();
               rackToBlocks.put(rack, blklist);
             }
             blklist.add(oneblock);
-            // Add this host to rackToNodes map
-            addHostToRack(oneblock.racks[j], oneblock.hosts[j]);
-         }
+            if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+              // Add this host to rackToNodes map
+              addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
+            }
+          }
 
           // add this block to the node --> block map
           for (int j = 0; j < oneblock.hosts.length; j++) {
@@ -553,7 +617,8 @@ public abstract class CombineFileInputFo
     }
   }
 
-  private static void addHostToRack(String rack, String host) {
+  private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
+      String rack, String host) {
     Set<String> hosts = rackToNodes.get(rack);
     if (hosts == null) {
       hosts = new HashSet<String>();
@@ -561,11 +626,14 @@ public abstract class CombineFileInputFo
     }
     hosts.add(host);
   }
+
   
-  private static List<String> getHosts(List<String> racks) {
-    List<String> hosts = new ArrayList<String>();
+  private Set<String> getHosts(Set<String> racks) {
+    Set<String> hosts = new HashSet<String>();
     for (String rack : racks) {
-      hosts.addAll(rackToNodes.get(rack));
+      if (rackToNodes.containsKey(rack)) {
+        hosts.addAll(rackToNodes.get(rack));
+      }
     }
     return hosts;
   }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java Fri Jun 21 06:37:27 2013
@@ -97,14 +97,14 @@ public class NLineInputFormat extends Fi
           numLines++;
           length += num;
           if (numLines == N) {
-            splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+            splits.add(createFileSplit(fileName, begin, length));
             begin += length;
             length = 0;
             numLines = 0;
           }
         }
         if (numLines != 0) {
-          splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+          splits.add(createFileSplit(fileName, begin, length));
         }
    
       } finally {
@@ -116,6 +116,23 @@ public class NLineInputFormat extends Fi
     return splits.toArray(new FileSplit[splits.size()]);
   }
 
+  /**
+   * NLineInputFormat uses LineRecordReader, which always reads
+   * (and consumes) at least one character out of its upper split
+   * boundary. So to make sure that each mapper gets N lines, we
+   * move back the upper split limits of each split 
+   * by one character here.
+   * @param fileName  Path of file
+   * @param begin  the position of the first byte in the file to process
+   * @param length  number of bytes in InputSplit
+   * @return  FileSplit
+   */
+  protected static FileSplit createFileSplit(Path fileName, long begin, long length) {
+    return (begin == 0) 
+    ? new FileSplit(fileName, begin, length - 1, new String[] {})
+    : new FileSplit(fileName, begin - 1, length, new String[] {});
+  }
+
   public void configure(JobConf conf) {
     N = conf.getInt("mapred.line.input.format.linespermap", 1);
   }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/TaggedInputSplit.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/TaggedInputSplit.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/TaggedInputSplit.java Fri Jun 21 06:37:27 2013
@@ -137,4 +137,9 @@ class TaggedInputSplit implements Config
     this.conf = conf;
   }
 
+  @Override
+  public String toString() {
+    return inputSplit.toString();
+  }
+
 }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJob.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJob.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJob.java Fri Jun 21 06:37:27 2013
@@ -85,7 +85,7 @@ public class ValueAggregatorJob {
     
     JobControl theControl = new JobControl("ValueAggregatorJobs");
     ArrayList<Job> dependingJobs = new ArrayList<Job>();
-    JobConf aJobConf = createValueAggregatorJob(args);
+    JobConf aJobConf = createValueAggregatorJob(args, (Class<?>) null);
     if(descriptors != null)
       setAggregatorDescriptors(aJobConf, descriptors);
     Job aJob = new Job(aJobConf, dependingJobs);
@@ -96,18 +96,34 @@ public class ValueAggregatorJob {
   public static JobControl createValueAggregatorJobs(String args[]) throws IOException {
     return createValueAggregatorJobs(args, null);
   }
-  
+
+  /**
+   * Create an Aggregate based map/reduce job.
+   *
+   * @param args the arguments used for job creation. Generic hadoop
+   * arguments are accepted.
+   * @return a JobConf object ready for submission.
+   *
+   * @throws IOException
+   * @see GenericOptionsParser
+   */
+  public static JobConf createValueAggregatorJob(String args[])
+  throws IOException {
+    return createValueAggregatorJob(args, (Class<?>) null);
+  }
+
   /**
    * Create an Aggregate based map/reduce job.
    * 
    * @param args the arguments used for job creation. Generic hadoop
    * arguments are accepted.
+   * @param caller the the caller class.
    * @return a JobConf object ready for submission.
    * 
    * @throws IOException
    * @see GenericOptionsParser
    */
-  public static JobConf createValueAggregatorJob(String args[])
+  public static JobConf createValueAggregatorJob(String args[], Class<?> caller)
     throws IOException {
 
     Configuration conf = new Configuration();
@@ -156,7 +172,7 @@ public class ValueAggregatorJob {
     }
     String userJarFile = theJob.get("user.jar.file");
     if (userJarFile == null) {
-      theJob.setJarByClass(ValueAggregator.class);
+      theJob.setJarByClass(caller != null ? caller : ValueAggregatorJob.class);
     } else {
       theJob.setJar(userJarFile);
     }
@@ -180,10 +196,16 @@ public class ValueAggregatorJob {
     return theJob;
   }
 
+  public static JobConf createValueAggregatorJob(String args[],
+      Class<? extends ValueAggregatorDescriptor>[] descriptors)
+      throws IOException {
+    return createValueAggregatorJob(args, descriptors, null);
+  }
+
   public static JobConf createValueAggregatorJob(String args[]
-    , Class<? extends ValueAggregatorDescriptor>[] descriptors)
+    , Class<? extends ValueAggregatorDescriptor>[] descriptors, Class<?> caller)
   throws IOException {
-    JobConf job = createValueAggregatorJob(args);
+    JobConf job = createValueAggregatorJob(args, caller);
     setAggregatorDescriptors(job, descriptors);
     return job;
   }
@@ -204,7 +226,8 @@ public class ValueAggregatorJob {
    * @throws IOException
    */
   public static void main(String args[]) throws IOException {
-    JobConf job = ValueAggregatorJob.createValueAggregatorJob(args);
+    JobConf job = ValueAggregatorJob.createValueAggregatorJob(args,
+        ValueAggregatorJob.class);
     JobClient.runJob(job);
   }
 }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
 package org.apache.hadoop.mapred.lib.db;
 
 import java.sql.PreparedStatement;

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java Fri Jun 21 06:37:27 2013
@@ -19,10 +19,10 @@ package org.apache.hadoop.mapred.tools;
 
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.AdminOperationsProtocol;
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.ToolRunner
  * and 1) refresh the service-level authorization policy, 2) refresh queue acl
  * properties.
  */
+@Private
 public class MRAdmin extends Configured implements Tool {
 
   public MRAdmin() {
@@ -57,7 +58,9 @@ public class MRAdmin extends Configured 
     "The full syntax is: \n\n" +
     "hadoop mradmin [-refreshServiceAcl] [-refreshQueues] " +
     "[-refreshNodes] [-refreshUserToGroupsMappings] " +
-    "[-refreshSuperUserGroupsConfiguration] [-help [cmd]]\n";
+    "[-refreshSuperUserGroupsConfiguration] " +
+    "[-safemode <enter | leave | wait | get> " +
+    "[-help [cmd]]\n";
 
   String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
     "\t\tJobtracker will reload the authorization policy file.\n";
@@ -74,6 +77,14 @@ public class MRAdmin extends Configured 
   String refreshNodes =
     "-refreshNodes: Refresh the hosts information at the jobtracker.\n";
   
+  String safemode = "-safemode <enter|leave|get|wait>:  Safe mode maintenance command.\n" + 
+      "\t\tSafe mode is a JobTracker state in which it\n" +
+      "\t\t\t1.  does not accept new job submissions\n" +
+      "\t\t\t2.  does not schedule any new tasks\n" +
+      "\t\t\t3.  does not fail any tasks due to any error\n" +
+      "\t\tSafe mode can be entered manually, but then\n" +
+      "\t\tit can only be turned off manually as well.\n";
+      
   String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
     "\t\tis specified.\n";
 
@@ -87,6 +98,8 @@ public class MRAdmin extends Configured 
     System.out.println(refreshSuperUserGroupsConfiguration);
   }  else if ("refreshNodes".equals(cmd)) {
     System.out.println(refreshNodes);
+  }  else if ("safemode".equals(cmd)) {
+    System.out.println(safemode);
   } else if ("help".equals(cmd)) {
     System.out.println(help);
   } else {
@@ -125,7 +138,8 @@ public class MRAdmin extends Configured 
       System.err.println("           [-refreshQueues]");
       System.err.println("           [-refreshUserToGroupsMappings]");
       System.err.println("           [-refreshSuperUserGroupsConfiguration]");
-      System.err.println("           [-refreshNodes]");
+      System.err.println("           [-refreshNodes]");      
+      System.err.println("           [-safemode <enter | leave | get | wait>]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
@@ -208,6 +222,58 @@ public class MRAdmin extends Configured 
     return 0;
   }
 
+  private int setSafeMode(String actionString) throws IOException {
+    JobTracker.SafeModeAction action;
+    Boolean waitExitSafe = false;
+
+    if ("leave".equalsIgnoreCase(actionString)) {
+      action = JobTracker.SafeModeAction.SAFEMODE_LEAVE;
+    } else if ("enter".equalsIgnoreCase(actionString)) {
+      action = JobTracker.SafeModeAction.SAFEMODE_ENTER;
+    } else if ("get".equalsIgnoreCase(actionString)) {
+      action = JobTracker.SafeModeAction.SAFEMODE_GET;
+    } else if ("wait".equalsIgnoreCase(actionString)) {
+      action = JobTracker.SafeModeAction.SAFEMODE_GET;
+      waitExitSafe = true;
+    } else {
+      printUsage("-safemode");
+      return -1;
+    }
+
+    // Get the current configuration
+    Configuration conf = getConf();
+    
+    // Create the client
+    AdminOperationsProtocol adminOperationsProtocol = 
+      (AdminOperationsProtocol) 
+      RPC.getProxy(AdminOperationsProtocol.class, 
+                   AdminOperationsProtocol.versionID, 
+                   JobTracker.getAddress(conf), getUGI(conf), conf,
+                   NetUtils.getSocketFactory(conf, 
+                                             AdminOperationsProtocol.class));
+    
+
+    boolean inSafeMode = adminOperationsProtocol.setSafeMode(action);
+
+    //
+    // If we are waiting for safemode to exit, then poll and
+    // sleep till we are out of safemode.
+    //
+    if (waitExitSafe) {
+      while (inSafeMode) {
+        try {
+          Thread.sleep(3000);
+        } catch (java.lang.InterruptedException e) {
+          throw new IOException("Wait Interrupted");
+        }
+        inSafeMode = adminOperationsProtocol.setSafeMode(action);
+      }
+    }
+
+    System.out.println("Safe mode is " + (inSafeMode ? "ON" : "OFF"));
+
+    return 0;
+  }
   
   /**
    * refreshSuperUserGroupsConfiguration {@link JobTracker}.
@@ -297,6 +363,12 @@ public class MRAdmin extends Configured 
         return exitCode;
       }
     }
+    if ("-safemode".equals(cmd)) {
+      if (args.length != 2) {
+        printUsage(cmd);
+        return exitCode;
+      }
+    }
     
     exitCode = 0;
     try {
@@ -310,6 +382,8 @@ public class MRAdmin extends Configured 
         exitCode = refreshSuperUserGroupsConfiguration();
       } else if ("-refreshNodes".equals(cmd)) {
         exitCode = refreshNodes();
+      } else if ("-safemode".equals(cmd)) {
+        exitCode = setSafeMode(args[i++]);
       } else if ("-help".equals(cmd)) {
         if (i < args.length) {
           printUsage(args[i]);

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java Fri Jun 21 06:37:27 2013
@@ -22,10 +22,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Locale;
 import java.util.MissingResourceException;
 import java.util.ResourceBundle;
 import java.util.TreeMap;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -45,9 +47,12 @@ public class CounterGroup implements Wri
    * Returns the specified resource bundle, or throws an exception.
    * @throws MissingResourceException if the bundle isn't found
    */
-  private static ResourceBundle getResourceBundle(String enumClassName) {
+  @Private
+  public static ResourceBundle getResourceBundle(String enumClassName)
+      throws MissingResourceException {
     String bundleName = enumClassName.replace('$','_');
-    return ResourceBundle.getBundle(bundleName);
+    return ResourceBundle.getBundle(bundleName, Locale.getDefault(), Thread
+        .currentThread().getContextClassLoader());
   }
 
   protected CounterGroup(String name) {

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Counters.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Counters.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Counters.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
 package org.apache.hadoop.mapreduce;
 
 import java.io.DataInput;

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/Job.java Fri Jun 21 06:37:27 2013
@@ -42,6 +42,56 @@ public class Job extends JobContext {  
   private JobClient jobClient;
   private RunningJob info;
 
+  /**
+   * Creates a new {@link Job}
+   * A Job will be created with a generic {@link Configuration}.
+   *
+   * @return the {@link Job}
+   * @throws IOException
+   */
+  public static Job getInstance() throws IOException {
+    // create with a null Cluster
+    return getInstance(new Configuration());
+  }
+
+  /**
+   * Creates a new {@link Job} with a given {@link Configuration}.
+   *
+   * The <code>Job</code> makes a copy of the <code>Configuration</code> so
+   * that any necessary internal modifications do not reflect on the incoming
+   * parameter.
+   *
+   * @param conf the {@link Configuration}
+   * @return the {@link Job}
+   * @throws IOException
+   */
+  public static Job getInstance(Configuration conf) throws IOException {
+    // create with a null Cluster
+    JobConf jobConf = new JobConf(conf);
+    return new Job(jobConf);
+  }
+
+  /**
+   * Creates a new {@link Job} with a given {@link Configuration}
+   * and a given jobName.
+   *
+   * The <code>Job</code> makes a copy of the <code>Configuration</code> so
+   * that any necessary internal modifications do not reflect on the incoming
+   * parameter.
+   *
+   * @param conf the {@link Configuration}
+   * @param jobName the job instance's name
+   * @return the {@link Job}
+   * @throws IOException
+   */
+  public static Job getInstance(Configuration conf, String jobName)
+           throws IOException {
+    // create with a null Cluster
+    Job result = getInstance(conf);
+    result.setJobName(jobName);
+    return result;
+  }
+
   public Job() throws IOException {
     this(new Configuration());
   }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java Fri Jun 21 06:37:27 2013
@@ -26,8 +26,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.conf.Configuration;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /**
  * A utility to manage job submission files.<br/>
  * <b><i>Note that this class is for framework internal usage only and is
@@ -35,6 +38,8 @@ import org.apache.hadoop.conf.Configurat
  */
 public class JobSubmissionFiles {
 
+  private final static Log LOG = LogFactory.getLog(JobSubmissionFiles.class);
+
   // job submission directory is private!
   final public static FsPermission JOB_DIR_PERMISSION =
     FsPermission.createImmutable((short) 0700); // rwx--------
@@ -105,17 +110,22 @@ public class JobSubmissionFiles {
     if (fs.exists(stagingArea)) {
       FileStatus fsStatus = fs.getFileStatus(stagingArea);
       if (!(fsStatus.isOwnedByUser(currentUser, currentUgi.getGroupNames())
-            || fsStatus.isOwnedByUser(realUser, ugi.getGroupNames()))
-          || !fsStatus.getPermission().equals(JOB_DIR_PERMISSION)) {
-         throw new IOException("The ownership/permissions on the staging " +
-                      "directory " + stagingArea + " is not as expected. " + 
-                      "It is owned by " + fsStatus.getOwner() + " and permissions are "+ 
-                      fsStatus.getPermission() + ". The directory must " +
+            || fsStatus.isOwnedByUser(realUser, ugi.getGroupNames()))) {
+         throw new IOException("The ownership on the staging directory " +
+                      stagingArea + " is not as expected. " +
+                      "It is owned by " + fsStatus.getOwner() + ". The directory must " +
                       "be owned by the submitter " + currentUser + " or " +
-                      "by " + realUser + " and permissions must be rwx------");
+                      "by " + realUser);
+      }
+      if (!fsStatus.getPermission().equals(JOB_DIR_PERMISSION)) {
+        LOG.info("Permissions on staging directory " + stagingArea + " are " +
+          "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+          "to correct value " + JOB_DIR_PERMISSION);
+        fs.setPermission(stagingArea, JOB_DIR_PERMISSION);
       }
     } else {
-      fs.mkdirs(stagingArea, new FsPermission(JOB_DIR_PERMISSION));
+      fs.mkdirs(stagingArea, 
+          new FsPermission(JOB_DIR_PERMISSION));
     }
     return stagingArea;
   }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java Fri Jun 21 06:37:27 2013
@@ -112,7 +112,8 @@ public class ReduceContext<KEYIN,VALUEIN
     buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
     key = keyDeserializer.deserialize(key);
     next = input.getValue();
-    buffer.reset(next.getData(), next.getPosition(), next.getLength());
+    buffer.reset(next.getData(), next.getPosition(),
+        next.getLength() - next.getPosition());
     value = valueDeserializer.deserialize(value);
     hasMore = input.next();
     if (hasMore) {

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Fri Jun 21 06:37:27 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -147,7 +148,10 @@ public abstract class CombineFileInputFo
   protected boolean isSplitable(JobContext context, Path file) {
     final CompressionCodec codec =
       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
-    return codec == null;
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
   }
 
   /**

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat_Counter.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat_Counter.properties (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat_Counter.properties Fri Jun 21 06:37:27 2013
@@ -1,3 +1,16 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+
 # ResourceBundle properties file for file-input-format counters
 
 CounterGroupName=                  File Input Format Counters 

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Fri Jun 21 06:37:27 2013
@@ -24,10 +24,15 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -49,6 +54,9 @@ public class LineRecordReader extends Re
   private int maxLineLength;
   private LongWritable key = null;
   private Text value = null;
+  private Seekable filePosition;
+  private CompressionCodec codec;
+  private Decompressor decompressor;
 
   public void initialize(InputSplit genericSplit,
                          TaskAttemptContext context) throws IOException {
@@ -60,30 +68,62 @@ public class LineRecordReader extends Re
     end = start + split.getLength();
     final Path file = split.getPath();
     compressionCodecs = new CompressionCodecFactory(job);
-    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
     FileSystem fs = file.getFileSystem(job);
     FSDataInputStream fileIn = fs.open(split.getPath());
-    boolean skipFirstLine = false;
-    if (codec != null) {
-      in = new LineReader(codec.createInputStream(fileIn), job);
-      end = Long.MAX_VALUE;
-    } else {
-      if (start != 0) {
-        skipFirstLine = true;
-        --start;
-        fileIn.seek(start);
+
+    if (isCompressedInput()) {
+      decompressor = CodecPool.getDecompressor(codec);
+      if (codec instanceof SplittableCompressionCodec) {
+        final SplitCompressionInputStream cIn =
+          ((SplittableCompressionCodec)codec).createInputStream(
+            fileIn, decompressor, start, end,
+            SplittableCompressionCodec.READ_MODE.BYBLOCK);
+        in = new LineReader(cIn, job);
+        start = cIn.getAdjustedStart();
+        end = cIn.getAdjustedEnd();
+        filePosition = cIn;
+      } else {
+        in = new LineReader(codec.createInputStream(fileIn, decompressor),
+            job);
+        filePosition = fileIn;
       }
+    } else {
+      fileIn.seek(start);
       in = new LineReader(fileIn, job);
+      filePosition = fileIn;
     }
-    if (skipFirstLine) {  // skip first line and re-establish "start".
-      start += in.readLine(new Text(), 0,
-                           (int)Math.min((long)Integer.MAX_VALUE, end - start));
+    // If this is not the first split, we always throw away first record
+    // because we always (except the last split) read one extra line in
+    // next() method.
+    if (start != 0) {
+      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
     }
     this.pos = start;
   }
   
+  private boolean isCompressedInput() {
+    return (codec != null);
+  }
+
+  private int maxBytesToConsume(long pos) {
+    return isCompressedInput()
+      ? Integer.MAX_VALUE
+      : (int) Math.min(Integer.MAX_VALUE, end - pos);
+  }
+
+  private long getFilePosition() throws IOException {
+    long retVal;
+    if (isCompressedInput() && null != filePosition) {
+      retVal = filePosition.getPos();
+    } else {
+      retVal = pos;
+    }
+    return retVal;
+  }
+
   public boolean nextKeyValue() throws IOException {
     if (key == null) {
       key = new LongWritable();
@@ -93,10 +133,11 @@ public class LineRecordReader extends Re
       value = new Text();
     }
     int newSize = 0;
-    while (pos < end) {
+    // We always read one extra line, which lies outside the upper
+    // split limit i.e. (end - 1)
+    while (getFilePosition() <= end) {
       newSize = in.readLine(value, maxLineLength,
-                            Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
-                                     maxLineLength));
+          Math.max(maxBytesToConsume(pos), maxLineLength));
       if (newSize == 0) {
         break;
       }
@@ -131,17 +172,24 @@ public class LineRecordReader extends Re
   /**
    * Get the progress within the split
    */
-  public float getProgress() {
+  public float getProgress() throws IOException {
     if (start == end) {
       return 0.0f;
     } else {
-      return Math.min(1.0f, (pos - start) / (float)(end - start));
+      return Math.min(1.0f,
+        (getFilePosition() - start) / (float)(end - start));
     }
   }
-  
+
   public synchronized void close() throws IOException {
-    if (in != null) {
-      in.close(); 
+    try {
+      if (in != null) {
+        in.close();
+      }
+    } finally {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+      }
     }
   }
 }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java Fri Jun 21 06:37:27 2013
@@ -107,25 +107,14 @@ public class NLineInputFormat extends Fi
         numLines++;
         length += num;
         if (numLines == numLinesPerSplit) {
-          // NLineInputFormat uses LineRecordReader, which always reads
-          // (and consumes) at least one character out of its upper split
-          // boundary. So to make sure that each mapper gets N lines, we
-          // move back the upper split limits of each split 
-          // by one character here.
-          if (begin == 0) {
-            splits.add(new FileSplit(fileName, begin, length - 1,
-              new String[] {}));
-          } else {
-            splits.add(new FileSplit(fileName, begin - 1, length,
-              new String[] {}));
-          }
+          splits.add(createFileSplit(fileName, begin, length));
           begin += length;
           length = 0;
           numLines = 0;
         }
       }
       if (numLines != 0) {
-        splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+        splits.add(createFileSplit(fileName, begin, length));
       }
     } finally {
       if (lr != null) {
@@ -134,6 +123,23 @@ public class NLineInputFormat extends Fi
     }
     return splits; 
   }
+
+  /**
+   * NLineInputFormat uses LineRecordReader, which always reads
+   * (and consumes) at least one character out of its upper split
+   * boundary. So to make sure that each mapper gets N lines, we
+   * move back the upper split limits of each split 
+   * by one character here.
+   * @param fileName  Path of file
+   * @param begin  the position of the first byte in the file to process
+   * @param length  number of bytes in InputSplit
+   * @return  FileSplit
+   */
+  protected static FileSplit createFileSplit(Path fileName, long begin, long length) {
+    return (begin == 0) 
+    ? new FileSplit(fileName, begin, length - 1, new String[] {})
+    : new FileSplit(fileName, begin - 1, length, new String[] {});
+  }
   
   /**
    * Set the number of lines per split

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java Fri Jun 21 06:37:27 2013
@@ -156,4 +156,9 @@ class TaggedInputSplit extends InputSpli
     this.conf = conf;
   }
 
+  @Override
+  public String toString() {
+    return inputSplit.toString();
+  }
+
 }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java Fri Jun 21 06:37:27 2013
@@ -23,6 +23,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -45,7 +46,10 @@ public class TextInputFormat extends Fil
   protected boolean isSplitable(JobContext context, Path file) {
     CompressionCodec codec = 
       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
-    return codec == null;
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
   }
 
 }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat_Counter.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat_Counter.properties (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat_Counter.properties Fri Jun 21 06:37:27 2013
@@ -1,3 +1,16 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+
 # ResourceBundle properties file for file-output-format counters
 
 CounterGroupName=                  File Output Format Counters 

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java Fri Jun 21 06:37:27 2013
@@ -44,8 +44,8 @@ import org.apache.hadoop.mapreduce.lib.p
  *  character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
  *  of the field); if omitted from pos2, it defaults to 0 (the end of the
  *  field). opts are ordering options (any of 'nr' as described above). 
- * We assume that the fields in the key are separated by 
- * {@link JobContext#MAP_OUTPUT_KEY_FIELD_SEPERATOR}.
+ * We assume that the fields in the key are separated by
+ * mapreduce.map.output.key.field.separator.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Jun 21 06:37:27 2013
@@ -52,6 +52,9 @@ public class TokenCache {
   
   private static final Log LOG = LogFactory.getLog(TokenCache.class);
 
+  static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = 
+    "mapreduce.job.credentials.binary";
+
   /**
    * auxiliary method to get user's secret keys..
    * @param alias
@@ -79,6 +82,16 @@ public class TokenCache {
     obtainTokensForNamenodesInternal(credentials, ps, conf);
   }
 
+  /**
+   * Remove jobtoken referrals which don't make sense in the context
+   * of the task execution.
+   *
+   * @param conf configuration object.
+   */
+  public static void cleanUpTokenReferral(Configuration conf) {
+    conf.unset(MAPREDUCE_JOB_CREDENTIALS_BINARY);
+  }
+
   static void obtainTokensForNamenodesInternal(Credentials credentials,
                                                Path [] ps, 
                                                Configuration conf
@@ -99,7 +112,7 @@ public class TokenCache {
         if (readFile) {
           readFile = false;
           String binaryTokenFilename =
-            conf.get("mapreduce.job.credentials.binary");
+            conf.get(MAPREDUCE_JOB_CREDENTIALS_BINARY);
           if (binaryTokenFilename != null) {
             Credentials binary;
             try {

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java Fri Jun 21 06:37:27 2013
@@ -89,7 +89,7 @@ public class DelegationTokenRenewal {
   }
   
   // global single timer (daemon)
-  private static Timer renewalTimer = new Timer(true);
+  private static Timer renewalTimer = null;
   
   //delegation token canceler thread
   private static DelegationTokenCancelThread dtCancelThread =
@@ -200,11 +200,16 @@ public class DelegationTokenRenewal {
    */
   private static class RenewalTimerTask extends TimerTask {
     private DelegationTokenToRenew dttr;
+    private boolean cancelled = false;
     
     RenewalTimerTask(DelegationTokenToRenew t) {  dttr = t;  }
     
     @Override
-    public void run() {
+    public synchronized void run() {
+      if (cancelled) {
+        return;
+      }
+
       Token<?> token = dttr.token;
       try {
         // need to use doAs so that http can find the kerberos tgt
@@ -227,12 +232,18 @@ public class DelegationTokenRenewal {
         removeFailedDelegationToken(dttr);
       }
     }
+
+    @Override
+    public synchronized boolean cancel() {
+      cancelled = true;
+      return super.cancel();
+    }
   }
   
   /**
    * set task to renew the token
    */
-  private static 
+  private static synchronized
   void setTimerForTokenRenewal(DelegationTokenToRenew token, 
                                boolean firstTime) throws IOException {
       
@@ -250,14 +261,20 @@ public class DelegationTokenRenewal {
     TimerTask tTask = new RenewalTimerTask(token);
     token.setTimerTask(tTask); // keep reference to the timer
 
+    if (renewalTimer == null) {
+        renewalTimer = new Timer(true);
+    }
     renewalTimer.schedule(token.timerTask, new Date(renewIn));
   }
 
   /**
    * removing all tokens renewals
    */
-  static public void close() {
-    renewalTimer.cancel();
+  public static synchronized void close() {
+    if (renewalTimer != null) {
+        renewalTimer.cancel();
+    }
+    renewalTimer = null;
     delegationTokens.clear();
   }
   

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java Fri Jun 21 06:37:27 2013
@@ -116,15 +116,15 @@ public class JobSplitWriter {
     if (array.length != 0) {
       SerializationFactory factory = new SerializationFactory(conf);
       int i = 0;
-      long offset = out.size();
+      long offset = out.getPos();
       for(T split: array) {
-        int prevCount = out.size();
+        long prevCount = out.getPos();
         Text.writeString(out, split.getClass().getName());
         Serializer<T> serializer = 
           factory.getSerializer((Class<T>) split.getClass());
         serializer.open(out);
         serializer.serialize(split);
-        int currCount = out.size();
+        long currCount = out.getPos();
         String[] locations = split.getLocations();
         final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10);
         if (locations.length > max_loc) {
@@ -149,12 +149,12 @@ public class JobSplitWriter {
     SplitMetaInfo[] info = new SplitMetaInfo[splits.length];
     if (splits.length != 0) {
       int i = 0;
-      long offset = out.size();
+      long offset = out.getPos();
       for(org.apache.hadoop.mapred.InputSplit split: splits) {
-        int prevLen = out.size();
+        long prevLen = out.getPos();
         Text.writeString(out, split.getClass().getName());
         split.write(out);
-        int currLen = out.size();
+        long currLen = out.getPos();
         String[] locations = split.getLocations();
         final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10);
         if (locations.length > max_loc) {

Modified: hadoop/common/branches/branch-1-win/src/native/configure.ac
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/native/configure.ac?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/native/configure.ac (original)
+++ hadoop/common/branches/branch-1-win/src/native/configure.ac Fri Jun 21 06:37:27 2013
@@ -39,6 +39,7 @@ AC_CONFIG_SRCDIR([src/org_apache_hadoop.
 AC_CONFIG_AUX_DIR(config)
 AC_CONFIG_HEADER([config.h])
 AC_SYS_LARGEFILE
+AC_GNU_SOURCE
 
 AM_INIT_AUTOMAKE(hadoop,1.0.0)
 
@@ -80,10 +81,8 @@ then
     JNI_CPPFLAGS="$JNI_CPPFLAGS -I$dir"
   done
 fi
-cppflags_bak=$CPPFLAGS
 CPPFLAGS="$CPPFLAGS $JNI_CPPFLAGS"
 AC_CHECK_HEADERS([jni.h], [], AC_MSG_ERROR([Native java headers not found. Is \$JAVA_HOME set correctly?]))
-CPPFLAGS=$cppflags_bak
 AC_SUBST([JNI_CPPFLAGS])
 
 dnl Check for zlib headers
@@ -105,6 +104,12 @@ AC_CHECK_HEADERS([snappy-c.h],
 dnl Check for headers needed by the native Group resolution implementation
 AC_CHECK_HEADERS([fcntl.h stdlib.h string.h unistd.h], [], AC_MSG_ERROR(Some system headers not found... please ensure their presence on your platform.))
 
+dnl check for posix_fadvise
+AC_CHECK_HEADERS(fcntl.h, [AC_CHECK_FUNCS(posix_fadvise)])
+
+dnl check for sync_file_range
+AC_CHECK_HEADERS(fcntl.h, [AC_CHECK_FUNCS(sync_file_range)])
+
 # Checks for typedefs, structures, and compiler characteristics.
 AC_C_CONST
 

Modified: hadoop/common/branches/branch-1-win/src/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/native/src/org/apache/hadoop/io/nativeio/NativeIO.c?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/native/src/org/apache/hadoop/io/nativeio/NativeIO.c (original)
+++ hadoop/common/branches/branch-1-win/src/native/src/org/apache/hadoop/io/nativeio/NativeIO.c Fri Jun 21 06:37:27 2013
@@ -35,6 +35,7 @@
 #include <fcntl.h>
 #include <sys/stat.h>
 #include <sys/types.h>
+#include <sys/syscall.h>
 #include "errno_enum.h"
 #endif
 
@@ -202,6 +203,80 @@ cleanup:
 #endif
 }
 
+/**
+ * public static native void posix_fadvise(
+ *   FileDescriptor fd, long offset, long len, int flags);
+ */
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_posix_1fadvise(
+  JNIEnv *env, jclass clazz,
+  jobject fd_object, jlong offset, jlong len, jint flags)
+{
+#ifndef HAVE_POSIX_FADVISE
+  THROW(env, "java/lang/UnsupportedOperationException",
+        "fadvise support not available");
+#else
+  int fd = fd_get(env, fd_object);
+  PASS_EXCEPTIONS(env);
+
+  int err = 0;
+  if ((err = posix_fadvise(fd, (off_t)offset, (off_t)len, flags))) {
+    throw_ioe(env, err);
+  }
+#endif
+}
+
+#if defined(HAVE_SYNC_FILE_RANGE)
+#  define my_sync_file_range sync_file_range
+#elif defined(SYS_sync_file_range)
+// RHEL 5 kernels have sync_file_range support, but the glibc
+// included does not have the library function. We can
+// still call it directly, and if it's not supported by the
+// kernel, we'd get ENOSYS. See RedHat Bugzilla #518581
+static int manual_sync_file_range (int fd, __off64_t from, __off64_t to, unsigned int flags)
+{
+#ifdef __x86_64__
+  return syscall( SYS_sync_file_range, fd, from, to, flags);
+#else
+  return syscall (SYS_sync_file_range, fd,
+    __LONG_LONG_PAIR ((long) (from >> 32), (long) from),
+    __LONG_LONG_PAIR ((long) (to >> 32), (long) to),
+    flags);
+#endif
+}
+#define my_sync_file_range manual_sync_file_range
+#endif
+
+/**
+ * public static native void sync_file_range(
+ *   FileDescriptor fd, long offset, long len, int flags);
+ */
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_sync_1file_1range(
+  JNIEnv *env, jclass clazz,
+  jobject fd_object, jlong offset, jlong len, jint flags)
+{
+#ifndef my_sync_file_range
+  THROW(env, "java/lang/UnsupportedOperationException",
+        "sync_file_range support not available");
+#else
+  int fd = fd_get(env, fd_object);
+  PASS_EXCEPTIONS(env);
+
+  if (my_sync_file_range(fd, (off_t)offset, (off_t)len, flags)) {
+    if (errno == ENOSYS) {
+      // we know the syscall number, but it's not compiled
+      // into the running kernel
+      THROW(env, "java/lang/UnsupportedOperationException",
+            "sync_file_range kernel support not available");
+      return;
+    } else {
+      throw_ioe(env, errno);
+    }
+  }
+#endif
+}
+
 /*
  * Class:     org_apache_hadoop_io_nativeio_NativeIO_POSIX
  * Method:    open

Modified: hadoop/common/branches/branch-1-win/src/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c (original)
+++ hadoop/common/branches/branch-1-win/src/native/src/org/apache/hadoop/io/nativeio/file_descriptor.c Fri Jun 21 06:37:27 2013
@@ -68,6 +68,11 @@ void fd_deinit(JNIEnv *env) {
  * underlying fd, or throw if unavailable
  */
 int fd_get(JNIEnv* env, jobject obj) {
+  if (obj == NULL) {
+    THROW(env, "java/lang/NullPointerException",
+          "FileDescriptor object is null");
+    return -1;
+  }
   return (*env)->GetIntField(env, obj, fd_descriptor);
 }
 
@@ -102,4 +107,4 @@ jobject fd_create(JNIEnv *env, long fd) 
   (*env)->SetLongField(env, obj, fd_handle, fd);
   return obj;
 }
-#endif
\ No newline at end of file
+#endif

Modified: hadoop/common/branches/branch-1-win/src/packages/templates/conf/commons-logging.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/packages/templates/conf/commons-logging.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/packages/templates/conf/commons-logging.properties (original)
+++ hadoop/common/branches/branch-1-win/src/packages/templates/conf/commons-logging.properties Fri Jun 21 06:37:27 2013
@@ -1,3 +1,16 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+
 #Logging Implementation
 
 #Log4J

Modified: hadoop/common/branches/branch-1-win/src/packages/templates/conf/hdfs-site.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/packages/templates/conf/hdfs-site.xml?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/packages/templates/conf/hdfs-site.xml (original)
+++ hadoop/common/branches/branch-1-win/src/packages/templates/conf/hdfs-site.xml Fri Jun 21 06:37:27 2013
@@ -130,7 +130,7 @@
       The HTTP Kerberos principal used by Hadoop-Auth in the HTTP endpoint.
 
       The HTTP Kerberos principal MUST start with 'HTTP/' per Kerberos
-      HTTP SPENGO specification.
+      HTTP SPNEGO specification.
     </description>
   </property>
 

Modified: hadoop/common/branches/branch-1-win/src/packages/templates/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/packages/templates/conf/log4j.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/packages/templates/conf/log4j.properties (original)
+++ hadoop/common/branches/branch-1-win/src/packages/templates/conf/log4j.properties Fri Jun 21 06:37:27 2013
@@ -83,7 +83,7 @@ log4j.appender.TLA.layout.ConversionPatt
 #
 hadoop.security.logger=INFO,console
 log4j.category.SecurityLogger=${hadoop.security.logger}
-hadoop.security.log.file=SecurityAuth.audit
+hadoop.security.log.file=SecurityAuth-${user.name}.audit
 log4j.appender.DRFAS=org.apache.log4j.DailyRollingFileAppender 
 log4j.appender.DRFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
 log4j.appender.DRFAS.layout=org.apache.log4j.PatternLayout



Mime
View raw message