hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077730 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/split/ test/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapreduce/split/
Date Fri, 04 Mar 2011 04:49:13 GMT
Author: omalley
Date: Fri Mar  4 04:49:13 2011
New Revision: 1077730

URL: http://svn.apache.org/viewvc?rev=1077730&view=rev
Log:
commit 51be5c3d61cbc7960174493428fbaa41d5fbe84d
Author: Chris Douglas <cdouglas@apache.org>
Date:   Fri Oct 1 01:49:51 2010 -0700

     Change client-side enforcement of limit on locations per split
    to be advisory. Truncate on client, optionally fail job at JobTracker if
    exceeded. Added mapreduce.job.max.split.locations property.
    
    +++ b/YAHOO-CHANGES.txt
    +     Change client-side enforcement of limit on locations per split
    +    to be advisory. Truncate on client, optionally fail job at JobTracker if
    +    exceeded. Added mapreduce.job.max.split.locations property. (cdouglas)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/split/
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/split/TestBlockLimits.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java
Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestBlockLimits.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077730&r1=1077729&r2=1077730&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar  4 04:49:13 2011
@@ -775,7 +775,8 @@ public class JobInProgress {
   TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
   throws IOException {
     TaskSplitMetaInfo[] allTaskSplitMetaInfo =
-      SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf, jobSubmitDir);
+      SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, jobtracker.getConf(),
+          jobSubmitDir);
     return allTaskSplitMetaInfo;
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java?rev=1077730&r1=1077729&r2=1077730&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
Fri Mar  4 04:49:13 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.spli
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -35,15 +36,19 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /**
  * The class that is used by the Job clients to write splits (both the meta
  * and the raw bytes parts)
  */
 public class JobSplitWriter {
 
+  private static final Log LOG = LogFactory.getLog(JobSplitWriter.class);
   private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
   private static final byte[] SPLIT_FILE_HEADER;
-  private static final int MAX_BLOCK_LOCATIONS = 100;
+  static final String MAX_SPLIT_LOCATIONS = "mapreduce.job.max.split.locations";
   
   static {
     try {
@@ -121,10 +126,12 @@ public class JobSplitWriter {
         serializer.serialize(split);
         int currCount = out.size();
         String[] locations = split.getLocations();
-        if (locations.length > MAX_BLOCK_LOCATIONS) {
-          throw new IOException("Max block location exceeded for split: "
+        final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10);
+        if (locations.length > max_loc) {
+          LOG.warn("Max block location exceeded for split: "
               + split + " splitsize: " + locations.length +
-              " maxsize: " + MAX_BLOCK_LOCATIONS);
+              " maxsize: " + max_loc);
+          locations = Arrays.copyOf(locations, max_loc);
         }
         info[i++] = 
           new JobSplit.SplitMetaInfo( 
@@ -149,10 +156,12 @@ public class JobSplitWriter {
         split.write(out);
         int currLen = out.size();
         String[] locations = split.getLocations();
-        if (locations.length > MAX_BLOCK_LOCATIONS) {
-          throw new IOException("Max block location exceeded for split: "
+        final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10);
+        if (locations.length > max_loc) {
+          LOG.warn("Max block location exceeded for split: "
               + split + " splitsize: " + locations.length +
-              " maxsize: " + MAX_BLOCK_LOCATIONS);
+              " maxsize: " + max_loc);
+          locations = Arrays.copyOf(locations, max_loc);
         }
         info[i++] = new JobSplit.SplitMetaInfo( 
             locations, offset,

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java?rev=1077730&r1=1077729&r2=1077730&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
Fri Mar  4 04:49:13 2011
@@ -62,9 +62,16 @@ public class SplitMetaInfoReader {
     int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values
     JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = 
       new JobSplit.TaskSplitMetaInfo[numSplits];
+    final int maxLocations =
+      conf.getInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, Integer.MAX_VALUE);
     for (int i = 0; i < numSplits; i++) {
       JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
       splitMetaInfo.readFields(in);
+      final int numLocations = splitMetaInfo.getLocations().length;
+      if (numLocations > maxLocations) {
+        throw new IOException("Max block location exceeded for split: #"  + i +
+              " splitsize: " + numLocations + " maxsize: " + maxLocations);
+      }
       JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
           JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString(), 
           splitMetaInfo.getStartOffset());

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/split/TestBlockLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/split/TestBlockLimits.java?rev=1077730&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/split/TestBlockLimits.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/split/TestBlockLimits.java
Fri Mar  4 04:49:13 2011
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A JUnit test to test limits on block locations
+ */
+public class TestBlockLimits extends TestCase {
+  private static String TEST_ROOT_DIR =
+    new File(System.getProperty("test.build.data","/tmp"))
+    .toURI().toString().replace(' ', '+');
+    
+  public void testWithLimits()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    MiniMRCluster mr = null;
+    try {
+      mr = new MiniMRCluster(2, "file:///", 3);
+      Configuration conf = new Configuration();
+      conf.setInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, 10);
+      mr = new MiniMRCluster(2, "file:///", 3, null, null, new JobConf(conf));
+      runCustomFormat(mr);
+    } finally {
+      if (mr != null) { mr.shutdown(); }
+    }
+  }
+  
+  private void runCustomFormat(MiniMRCluster mr) throws IOException {
+    JobConf job = new JobConf(mr.createJobConf());
+    job.setInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, 100);
+    FileSystem fileSys = FileSystem.get(job);
+    Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
+    Path outDir = new Path(testDir, "out");
+    System.out.println("testDir= " + testDir);
+    fileSys.delete(testDir, true);
+    job.setInputFormat(MyInputFormat.class);
+    job.setOutputFormat(MyOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    
+    job.setMapperClass(MyMapper.class);        
+    job.setNumReduceTasks(0);
+    job.set("non.std.out", outDir.toString());
+    try {
+      JobClient.runJob(job);
+      fail("JobTracker neglected to fail misconfigured job");
+    } catch(IOException ie) {
+      System.out.println("Failed job " + StringUtils.stringifyException(ie));
+    } finally {
+      fileSys.delete(testDir, true);
+    }
+    
+  }
+  
+  static class MyMapper extends MapReduceBase
+    implements Mapper<WritableComparable, Writable,
+                    WritableComparable, Writable> {
+
+    public void map(WritableComparable key, Writable value,
+                  OutputCollector<WritableComparable, Writable> out,
+                  Reporter reporter) throws IOException {
+    }
+  }
+
+  private static class MyInputFormat
+    implements InputFormat<Text, Text> {
+    
+    private static class MySplit implements InputSplit {
+      int first;
+      int length;
+
+      public MySplit() { }
+
+      public MySplit(int first, int length) {
+        this.first = first;
+        this.length = length;
+      }
+
+      public String[] getLocations() {
+        final String[] ret = new String[200];
+        Arrays.fill(ret, "SPLIT");
+        return ret;
+      }
+
+      public long getLength() {
+        return length;
+      }
+
+      public void write(DataOutput out) throws IOException {
+        WritableUtils.writeVInt(out, first);
+        WritableUtils.writeVInt(out, length);
+      }
+
+      public void readFields(DataInput in) throws IOException {
+        first = WritableUtils.readVInt(in);
+        length = WritableUtils.readVInt(in);
+      }
+    }
+    
+    public InputSplit[] getSplits(JobConf job, 
+                                  int numSplits) throws IOException {
+      return new MySplit[]{new MySplit(0, 1), new MySplit(1, 3),
+                           new MySplit(4, 2)};
+    }
+
+    public RecordReader<Text, Text> getRecordReader(InputSplit split,
+                                                           JobConf job, 
+                                                           Reporter reporter)
+                                                           throws IOException {
+      MySplit sp = (MySplit) split;
+      return new RecordReader<Text,Text>() {
+        @Override public boolean next(Text key, Text value) { return false; }
+        @Override public Text createKey() { return new Text(); }
+        @Override public Text createValue() { return new Text(); }
+        @Override public long getPos() throws IOException { return 0; }
+        @Override public void close() throws IOException { }
+        @Override public float getProgress() throws IOException { return 1.0f; }
+      };
+    }
+    
+  }
+  
+
+  static class MyOutputFormat implements OutputFormat {
+    static class MyRecordWriter implements RecordWriter<Object, Object> {
+      private DataOutputStream out;
+      
+      public MyRecordWriter(Path outputFile, JobConf job) throws IOException {
+      }
+      
+      public void write(Object key, Object value) throws IOException {
+        return;
+      }
+
+      public void close(Reporter reporter) throws IOException {
+      }
+    }
+    
+    public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, 
+                                        String name,
+                                        Progressable progress
+                                        ) throws IOException {
+      return new MyRecordWriter(new Path(job.get("non.std.out")), job);
+    }
+
+    public void checkOutputSpecs(FileSystem ignored, 
+                                 JobConf job) throws IOException {
+    }
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java?rev=1077730&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java
Fri Mar  4 04:49:13 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public class TestJobSplitWriter {
+
+  static final String TEST_ROOT = System.getProperty("test.build.data", "/tmp");
+  static final Path TEST_DIR =
+    new Path(TEST_ROOT, TestJobSplitWriter.class.getSimpleName());
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    final FileSystem fs = FileSystem.getLocal(new Configuration()).getRaw();
+    fs.delete(TEST_DIR, true);
+  }
+
+  static abstract class NewSplit extends InputSplit implements Writable {
+    @Override public long getLength() { return 42L; }
+    @Override public void readFields(DataInput in) throws IOException { }
+    @Override public void write(DataOutput in) throws IOException { }
+  }
+
+  @Test
+  public void testSplitLocationLimit()
+      throws IOException, InterruptedException  {
+    final int SPLITS = 5;
+    final int MAX_LOC = 10;
+    final Path outdir = new Path(TEST_DIR, "testSplitLocationLimit");
+    final String[] locs = getLoc(MAX_LOC + 5);
+    final Configuration conf = new Configuration();
+    final FileSystem rfs = FileSystem.getLocal(conf).getRaw();
+    final InputSplit split = new NewSplit() {
+      @Override public String[] getLocations() { return locs; }
+    };
+    List<InputSplit> splits = Collections.nCopies(SPLITS, split);
+
+    conf.setInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, MAX_LOC);
+    JobSplitWriter.createSplitFiles(outdir, conf,
+        FileSystem.getLocal(conf).getRaw(), splits);
+
+    checkMeta(MAX_LOC,
+        SplitMetaInfoReader.readSplitMetaInfo(null, rfs, conf, outdir),
+        Arrays.copyOf(locs, MAX_LOC));
+
+    conf.setInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, MAX_LOC / 2);
+    try {
+      SplitMetaInfoReader.readSplitMetaInfo(null, rfs, conf, outdir);
+      fail("Reader failed to detect location limit");
+    } catch (IOException e) { }
+  }
+
+  static abstract class OldSplit
+      implements org.apache.hadoop.mapred.InputSplit {
+    @Override public long getLength() { return 42L; }
+    @Override public void readFields(DataInput in) throws IOException { }
+    @Override public void write(DataOutput in) throws IOException { }
+  }
+
+  @Test
+  public void testSplitLocationLimitOldApi() throws IOException {
+    final int SPLITS = 5;
+    final int MAX_LOC = 10;
+    final Path outdir = new Path(TEST_DIR, "testSplitLocationLimitOldApi");
+    final String[] locs = getLoc(MAX_LOC + 5);
+    final Configuration conf = new Configuration();
+    final FileSystem rfs = FileSystem.getLocal(conf).getRaw();
+    final org.apache.hadoop.mapred.InputSplit split = new OldSplit() {
+      @Override public String[] getLocations() { return locs; }
+    };
+    org.apache.hadoop.mapred.InputSplit[] splits =
+      new org.apache.hadoop.mapred.InputSplit[SPLITS];
+    Arrays.fill(splits, split);
+
+    conf.setInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, MAX_LOC);
+    JobSplitWriter.createSplitFiles(outdir, conf,
+        FileSystem.getLocal(conf).getRaw(), splits);
+    checkMeta(MAX_LOC,
+        SplitMetaInfoReader.readSplitMetaInfo(null, rfs, conf, outdir),
+        Arrays.copyOf(locs, MAX_LOC));
+
+    conf.setInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, MAX_LOC / 2);
+    try {
+      SplitMetaInfoReader.readSplitMetaInfo(null, rfs, conf, outdir);
+      fail("Reader failed to detect location limit");
+    } catch (IOException e) { }
+  }
+
+  private static void checkMeta(int MAX_LOC,
+      JobSplit.TaskSplitMetaInfo[] metaSplits, String[] chk_locs) {
+    for (JobSplit.TaskSplitMetaInfo meta : metaSplits) {
+      final String[] meta_locs = meta.getLocations();
+      assertEquals(MAX_LOC, meta_locs.length);
+      assertArrayEquals(chk_locs, meta_locs);
+    }
+  }
+
+  private static String[] getLoc(int locations) {
+    final String ret[] = new String[locations];
+    for (int i = 0; i < locations; ++i) {
+      ret[i] = "LOC" + i;
+    }
+    return ret;
+  }
+
+}



Mime
View raw message