hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r906820 [2/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/map...
Date Fri, 05 Feb 2010 05:43:21 GMT
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java?rev=906820&r1=906819&r2=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java
Fri Feb  5 05:43:19 2010
@@ -74,7 +74,7 @@
     job.setMapOutputKeyClass(NullWritable.class);
     job.setMapOutputValueClass(NullWritable.class);
     job.setOutputFormat(NullOutputFormat.class);
-    job.setInputFormat(TestMapCollection.FakeIF.class);
+    job.setInputFormat(TestReduceFetchFromPartialMem.FakeIF.class);
     
     // run the job
     JobClient jc = new JobClient(mrConf);

Copied: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapCollection.java
(from r906813, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapCollection.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapCollection.java?p2=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapCollection.java&p1=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapCollection.java&r1=906813&r2=906820&rev=906820&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapCollection.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapCollection.java
Fri Feb  5 05:43:19 2010
@@ -16,289 +16,523 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapreduce;
 
 import junit.framework.TestCase;
 
 import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
-import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 
-public class TestMapCollection extends TestCase {
+public class TestMapCollection {
 
   private static final Log LOG = LogFactory.getLog(
       TestMapCollection.class.getName());
 
-  public static class KeyWritable
-      implements WritableComparable<KeyWritable>, JobConfigurable {
-
-    private final byte c = (byte)('K' & 0xFF);
-    static private boolean pedantic = false;
-    protected int expectedlen;
-
-    public void configure(JobConf conf) {
-      expectedlen = conf.getInt("test.keywritable.length", 1);
-      pedantic = conf.getBoolean("test.pedantic.verification", false);
-    }
-
-    public KeyWritable() { }
-
-    public KeyWritable(int len) {
-      this();
-      expectedlen = len;
+  public static abstract class FillWritable implements Writable, Configurable {
+    private int len;
+    protected boolean disableRead;
+    private byte[] b;
+    private final Random r;
+    protected final byte fillChar;
+    public FillWritable(byte fillChar) {
+      this.fillChar = fillChar;
+      r = new Random();
+      final long seed = r.nextLong();
+      LOG.info("seed: " + seed);
+      r.setSeed(seed);
+    }
+    @Override
+    public Configuration getConf() {
+      return null;
     }
-
-    public int getLength() {
-      return expectedlen;
+    public void setLength(int len) {
+      this.len = len;
     }
-
-    public int compareTo(KeyWritable o) {
+    public int compareTo(FillWritable o) {
       if (o == this) return 0;
-      return expectedlen - o.getLength();
-    }
-
-    public boolean equals(Object o) {
-      if (o == this) return true;
-      if (!(o instanceof KeyWritable)) return false;
-      return 0 == compareTo((KeyWritable)o);
+      return len - o.len;
     }
-
+    @Override
     public int hashCode() {
-      return 37 * expectedlen;
+      return 37 * len;
     }
-
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof FillWritable)) return false;
+      return 0 == compareTo((FillWritable)o);
+    }
+    @Override
     public void readFields(DataInput in) throws IOException {
-      if (expectedlen != 0) {
-        int bytesread;
-        if (pedantic) {
-          for (int i = 0; i < expectedlen; ++i)
-            assertEquals("Invalid byte at " + i, c, in.readByte());
-          bytesread = expectedlen;
-        } else {
-          bytesread = in.skipBytes(expectedlen);
-        }
-        assertEquals("Too few bytes in record", expectedlen, bytesread);
+      if (disableRead) {
+        return;
+      }
+      len = WritableUtils.readVInt(in);
+      for (int i = 0; i < len; ++i) {
+        assertEquals("Invalid byte at " + i, fillChar, in.readByte());
       }
-      // cannot verify that the stream has been exhausted
     }
-
+    @Override
     public void write(DataOutput out) throws IOException {
-      if (expectedlen != 0) {
-        if (expectedlen > 1024) {
-          byte[] b = new byte[expectedlen];
-          Arrays.fill(b, c);
-          out.write(b);
-        } else {
-          for (int i = 0; i < expectedlen; ++i) {
-            out.write(c);
-          }
-        }
+      if (0 == len) {
+        return;
       }
-    }
-
-    public static class Comparator extends WritableComparator {
-      public Comparator() {
-        super(KeyWritable.class);
+      int written = 0;
+      if (!disableRead) {
+        WritableUtils.writeVInt(out, len);
+        written -= WritableUtils.getVIntSize(len);
       }
-
-      public int compare(byte[] b1, int s1, int l1,
-                         byte[] b2, int s2, int l2) {
-        if (pedantic) {
-          for (int i = s1; i < l1; ++i) {
-            assertEquals("Invalid key at " + s1, b1[i], (byte)('K' & 0xFF));
-          }
-          for (int i = s2; i < l2; ++i) {
-            assertEquals("Invalid key at " + s2, b2[i], (byte)('K' & 0xFF));
-          }
+      if (len > 1024) {
+        if (null == b || b.length < len) {
+          b = new byte[2 * len];
+        }
+        Arrays.fill(b, fillChar);
+        do {
+          final int write = Math.min(len - written, r.nextInt(len));
+          out.write(b, 0, write);
+          written += write;
+        } while (written < len);
+        assertEquals(len, written);
+      } else {
+        for (int i = written; i < len; ++i) {
+          out.write(fillChar);
         }
-        return l1 - l2;
       }
     }
-
-
-    static {
-      WritableComparator.define(KeyWritable.class, new Comparator());
-    }
   }
 
-  public static class ValWritable extends KeyWritable {
-
-    private final byte c = (byte)('V' & 0xFF);
-
-    public ValWritable() { }
-
-    public ValWritable(int len) {
-      this();
-      expectedlen = len;
-    }
+  public static class KeyWritable
+    extends FillWritable implements WritableComparable<FillWritable> {
 
-    public void configure(JobConf conf) {
-      expectedlen = conf.getInt("test.valwritable.length", 1);
+    static final byte keyFill = (byte)('K' & 0xFF);
+    public KeyWritable() {
+      super(keyFill);
+    }
+    @Override
+    public void setConf(Configuration conf) {
+      disableRead = conf.getBoolean("test.disable.key.read", false);
     }
   }
 
-  public static class SpillMapper
-      implements Mapper<NullWritable,NullWritable,KeyWritable,ValWritable> {
-
-    private int keylen = 1;
-    private int vallen = 1;
-    private int numrecs = 100;
-
-    public void configure(JobConf job) {
-      keylen = job.getInt("test.keywritable.length", 1);
-      vallen = job.getInt("test.valwritable.length", 1);
-      numrecs = job.getInt("test.spillmap.records", 100);
+  public static class ValWritable extends FillWritable {
+    public ValWritable() {
+      super((byte)('V' & 0xFF));
+    }
+    @Override
+    public void setConf(Configuration conf) {
+      disableRead = conf.getBoolean("test.disable.val.read", false);
     }
+  }
 
-    public void map(NullWritable key, NullWritable value,
-        OutputCollector<KeyWritable,ValWritable> out, Reporter reporter)
-        throws IOException {
-      KeyWritable k = new KeyWritable(keylen);
-      ValWritable v = new ValWritable(vallen);
-      for (int i = 0; i < numrecs; ++i) {
-        if ((i % 1000) == 0) {
-          reporter.progress();
-        }
-        out.collect(k, v);
+  public static class VariableComparator
+      implements RawComparator<KeyWritable>, Configurable {
+    private boolean readLen;
+    public VariableComparator() { }
+    @Override
+    public void setConf(Configuration conf) {
+      readLen = !conf.getBoolean("test.disable.key.read", false);
+    }
+    @Override
+    public Configuration getConf() { return null; }
+    public int compare(KeyWritable k1, KeyWritable k2) {
+      return k1.compareTo(k2);
+    }
+    @Override
+    public int compare(byte[] b1, int s1, int l1,
+                       byte[] b2, int s2, int l2) {
+      final int n1;
+      final int n2;
+      if (readLen) {
+        n1 = WritableUtils.decodeVIntSize(b1[s1]);
+        n2 = WritableUtils.decodeVIntSize(b2[s2]);
+      } else {
+        n1 = 0;
+        n2 = 0;
+      }
+      for (int i = s1 + n1; i < l1 - n1; ++i) {
+        assertEquals("Invalid key at " + s1, (int)KeyWritable.keyFill, b1[i]);
+      }
+      for (int i = s2 + n2; i < l2 - n2; ++i) {
+        assertEquals("Invalid key at " + s2, (int)KeyWritable.keyFill, b2[i]);
       }
+      return l1 - l2;
     }
-
-    public void close() { }
-
   }
 
   public static class SpillReducer
-      implements Reducer<KeyWritable,ValWritable,NullWritable,NullWritable> {
+      extends Reducer<KeyWritable,ValWritable,NullWritable,NullWritable> {
 
-    private int numrecs = 100;
-
-    public void configure(JobConf job) {
-      numrecs = job.getInt("test.spillmap.records", 100);
-    }
+    private int numrecs;
+    private int expected;
 
-    public void reduce(KeyWritable k, Iterator<ValWritable> values,
-        OutputCollector<NullWritable,NullWritable> out, Reporter reporter) {
-      int i = 0;
-      while (values.hasNext()) {
-        values.next();
-        ++i;
+    @Override
+    protected void setup(Context job) {
+      numrecs = 0;
+      expected = job.getConfiguration().getInt("test.spillmap.records", 100);
+    }
+
+    @Override
+    protected void reduce(KeyWritable k, Iterable<ValWritable> values,
+        Context context) throws IOException, InterruptedException {
+      for (ValWritable val : values) {
+        ++numrecs;
       }
-      assertEquals("Unexpected record count (" + i + "/" +
-                   numrecs + ")", numrecs, i);
     }
-
-    public void close() { }
-
+    @Override
+    protected void cleanup(Context context)
+        throws IOException, InterruptedException {
+      assertEquals("Unexpected record count", expected, numrecs);
+    }
   }
 
-  public static class FakeSplit implements InputSplit {
+  public static class FakeSplit extends InputSplit implements Writable {
+    @Override
     public void write(DataOutput out) throws IOException { }
+    @Override
     public void readFields(DataInput in) throws IOException { }
+    @Override
     public long getLength() { return 0L; }
+    @Override
     public String[] getLocations() { return new String[0]; }
   }
 
-  public static class FakeIF
-      implements InputFormat<NullWritable,NullWritable> {
+  public abstract static class RecordFactory implements Configurable {
+    public Configuration getConf() { return null; }
+    public abstract int keyLen(int i);
+    public abstract int valLen(int i);
+  }
+
+  public static class FixedRecordFactory extends RecordFactory {
+    private int keylen;
+    private int vallen;
+    public FixedRecordFactory() { }
+    public void setConf(Configuration conf) {
+      keylen = conf.getInt("test.fixedrecord.keylen", 0);
+      vallen = conf.getInt("test.fixedrecord.vallen", 0);
+    }
+    public int keyLen(int i) { return keylen; }
+    public int valLen(int i) { return vallen; }
+    public static void setLengths(Configuration conf, int keylen, int vallen) {
+      conf.setInt("test.fixedrecord.keylen", keylen);
+      conf.setInt("test.fixedrecord.vallen", vallen);
+      conf.setBoolean("test.disable.key.read", 0 == keylen);
+      conf.setBoolean("test.disable.val.read", 0 == vallen);
+    }
+  }
+
+  public static class FakeIF extends InputFormat<KeyWritable,ValWritable> {
 
     public FakeIF() { }
 
-    public InputSplit[] getSplits(JobConf conf, int numSplits) {
-      InputSplit[] splits = new InputSplit[numSplits];
-      for (int i = 0; i < splits.length; ++i) {
-        splits[i] = new FakeSplit();
+    @Override
+    public List<InputSplit> getSplits(JobContext ctxt) throws IOException {
+      final int numSplits = ctxt.getConfiguration().getInt(
+          "test.mapcollection.num.maps", -1);
+      List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+      for (int i = 0; i < numSplits; ++i) {
+        splits.add(i, new FakeSplit());
       }
       return splits;
     }
 
-    public RecordReader<NullWritable,NullWritable> getRecordReader(
-        InputSplit ignored, JobConf conf, Reporter reporter) {
-      return new RecordReader<NullWritable,NullWritable>() {
-        private boolean done = false;
-        public boolean next(NullWritable key, NullWritable value)
-            throws IOException {
-          if (done)
-            return false;
-          done = true;
-          return true;
+    public RecordReader<KeyWritable,ValWritable> createRecordReader(
+        InputSplit ignored, final TaskAttemptContext taskContext) {
+      return new RecordReader<KeyWritable,ValWritable>() {
+        private RecordFactory factory;
+        private final KeyWritable key = new KeyWritable();
+        private final ValWritable val = new ValWritable();
+        private int current;
+        private int records;
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext context) {
+          final Configuration conf = context.getConfiguration();
+          key.setConf(conf);
+          val.setConf(conf);
+          factory = ReflectionUtils.newInstance(
+              conf.getClass("test.mapcollection.class",
+                FixedRecordFactory.class, RecordFactory.class), conf);
+          assertNotNull(factory);
+          current = 0;
+          records = conf.getInt("test.spillmap.records", 100);
+        }
+        @Override
+        public boolean nextKeyValue() {
+          key.setLength(factory.keyLen(current));
+          val.setLength(factory.valLen(current));
+          return current++ < records;
+        }
+        @Override
+        public KeyWritable getCurrentKey() { return key; }
+        @Override
+        public ValWritable getCurrentValue() { return val; }
+        @Override
+        public float getProgress() { return (float) current / records; }
+        @Override
+        public void close() {
+          assertEquals("Unexpected count", records, current - 1);
         }
-        public NullWritable createKey() { return NullWritable.get(); }
-        public NullWritable createValue() { return NullWritable.get(); }
-        public long getPos() throws IOException { return 0L; }
-        public void close() throws IOException { }
-        public float getProgress() throws IOException { return 0.0f; }
       };
     }
   }
 
   private static void runTest(String name, int keylen, int vallen,
-      int records, int ioSortMB, float recPer, float spillPer,
-      boolean pedantic) throws Exception {
-    JobConf conf = new JobConf(new Configuration(), SpillMapper.class);
-
+      int records, int ioSortMB, float spillPer)
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
+    Job job = Job.getInstance(new Cluster(conf), conf);
+    conf = job.getConfiguration();
     conf.setInt(JobContext.IO_SORT_MB, ioSortMB);
-    conf.set(JobContext.MAP_SORT_RECORD_PERCENT, Float.toString(recPer));
     conf.set(JobContext.MAP_SORT_SPILL_PERCENT, Float.toString(spillPer));
-
-    conf.setInt("test.keywritable.length", keylen);
-    conf.setInt("test.valwritable.length", vallen);
+    conf.setClass("test.mapcollection.class", FixedRecordFactory.class,
+        RecordFactory.class);
+    FixedRecordFactory.setLengths(conf, keylen, vallen);
     conf.setInt("test.spillmap.records", records);
-    conf.setBoolean("test.pedantic.verification", pedantic);
+    runTest(name, job);
+  }
 
-    conf.setNumMapTasks(1);
-    conf.setNumReduceTasks(1);
-    conf.setInputFormat(FakeIF.class);
-    conf.setOutputFormat(NullOutputFormat.class);
-    conf.setMapperClass(SpillMapper.class);
-    conf.setReducerClass(SpillReducer.class);
-    conf.setMapOutputKeyClass(KeyWritable.class);
-    conf.setMapOutputValueClass(ValWritable.class);
+  private static void runTest(String name, Job job) throws Exception {
+    job.setNumReduceTasks(1);
+    job.getConfiguration().setInt(JobContext.IO_SORT_FACTOR, 1000);
+    job.getConfiguration().set("fs.default.name", "file:///");
+    job.getConfiguration().setInt("test.mapcollection.num.maps", 1);
+    job.setInputFormatClass(FakeIF.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setMapperClass(Mapper.class);
+    job.setReducerClass(SpillReducer.class);
+    job.setMapOutputKeyClass(KeyWritable.class);
+    job.setMapOutputValueClass(ValWritable.class);
+    job.setSortComparatorClass(VariableComparator.class);
 
     LOG.info("Running " + name);
-    JobClient.runJob(conf);
+    assertTrue("Job failed!", job.waitForCompletion(false));
   }
 
-  private static void runTest(String name, int keylen, int vallen, int records,
-      boolean pedantic) throws Exception {
-    runTest(name, keylen, vallen, records, 1, 0.05f, .8f, pedantic);
-  }
-
-  public void testLastFill() throws Exception {
+  @Test
+  public void testValLastByte() throws Exception {
     // last byte of record/key is the last/first byte in the spill buffer
-    runTest("vallastbyte", 128, 896, 1344, 1, 0.125f, 0.5f, true);
-    runTest("keylastbyte", 512, 1024, 896, 1, 0.125f, 0.5f, true);
+    runTest("vallastbyte", 128, 896, 1344, 1, 0.5f);
+    runTest("keylastbyte", 512, 1024, 896, 1, 0.5f);
   }
 
+  @Test
   public void testLargeRecords() throws Exception {
     // maps emitting records larger than mapreduce.task.io.sort.mb
-    runTest("largerec", 100, 1024*1024, 5, false);
-    runTest("largekeyzeroval", 1024*1024, 0, 5, false);
+    runTest("largerec", 100, 1024*1024, 5, 1, .8f);
+    runTest("largekeyzeroval", 1024*1024, 0, 5, 1, .8f);
   }
 
-  public void testSpillPer() throws Exception {
+  @Test
+  public void testSpillPer2B() throws Exception {
     // set non-default, 100% speculative spill boundary
-    runTest("fullspill2B", 1, 1, 10000, 1, 0.05f, 1.0f, true);
-    runTest("fullspill200B", 100, 100, 10000, 1, 0.05f, 1.0f, true);
-    runTest("fullspillbuf", 10 * 1024, 20 * 1024, 256, 1, 0.3f, 1.0f, true);
-    runTest("lt50perspill", 100, 100, 10000, 1, 0.05f, 0.3f, true);
+    runTest("fullspill2B", 1, 1, 10000, 1, 1.0f);
+    runTest("fullspill200B", 100, 100, 10000, 1, 1.0f);
+    runTest("fullspillbuf", 10 * 1024, 20 * 1024, 256, 1, 1.0f);
+    runTest("lt50perspill", 100, 100, 10000, 1, 0.3f);
   }
 
-  public void testZeroLength() throws Exception {
+  @Test
+  public void testZeroVal() throws Exception {
     // test key/value at zero-length
-    runTest("zeroval", 1, 0, 10000, true);
-    runTest("zerokey", 0, 1, 10000, true);
-    runTest("zerokeyval", 0, 0, 10000, false);
-    runTest("zerokeyvalfull", 0, 0, 10000, 1, 0.05f, 1.0f, false);
+    runTest("zeroval", 1, 0, 10000, 1, .8f);
+    runTest("zerokey", 0, 1, 10000, 1, .8f);
+    runTest("zerokeyval", 0, 0, 10000, 1, .8f);
+    runTest("zerokeyvalfull", 0, 0, 10000, 1, 1.0f);
+  }
+
+  @Test
+  public void testSingleRecord() throws Exception {
+    runTest("singlerecord", 100, 100, 1, 1, 1.0f);
+    runTest("zerokeyvalsingle", 0, 0, 1, 1, 1.0f);
+  }
+
+  @Test
+  public void testLowSpill() throws Exception {
+    runTest("lowspill", 4000, 96, 20, 1, 0.00390625f);
+  }
+
+  @Test
+  public void testSplitMetaSpill() throws Exception {
+    runTest("splitmetaspill", 7, 1, 131072, 1, 0.8f);
+  }
+
+  public static class StepFactory extends RecordFactory {
+    public int prekey;
+    public int postkey;
+    public int preval;
+    public int postval;
+    public int steprec;
+    public void setConf(Configuration conf) {
+      prekey = conf.getInt("test.stepfactory.prekey", 0);
+      postkey = conf.getInt("test.stepfactory.postkey", 0);
+      preval = conf.getInt("test.stepfactory.preval", 0);
+      postval = conf.getInt("test.stepfactory.postval", 0);
+      steprec = conf.getInt("test.stepfactory.steprec", 0);
+    }
+    public static void setLengths(Configuration conf, int prekey, int postkey,
+        int preval, int postval, int steprec) {
+      conf.setInt("test.stepfactory.prekey", prekey);
+      conf.setInt("test.stepfactory.postkey", postkey);
+      conf.setInt("test.stepfactory.preval", preval);
+      conf.setInt("test.stepfactory.postval", postval);
+      conf.setInt("test.stepfactory.steprec", steprec);
+    }
+    public int keyLen(int i) {
+      return i > steprec ? postkey : prekey;
+    }
+    public int valLen(int i) {
+      return i > steprec ? postval : preval;
+    }
+  }
+
+  @Test
+  public void testPostSpillMeta() throws Exception {
+    // write larger records until spill, then write records that generate
+    // no writes into the serialization buffer
+    Configuration conf = new Configuration();
+    conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
+    Job job = Job.getInstance(new Cluster(conf), conf);
+    conf = job.getConfiguration();
+    conf.setInt(JobContext.IO_SORT_MB, 1);
+    // 2^20 * spill = 14336 bytes available post-spill, at most 896 meta
+    conf.set(JobContext.MAP_SORT_SPILL_PERCENT, Float.toString(.986328125f));
+    conf.setClass("test.mapcollection.class", StepFactory.class,
+        RecordFactory.class);
+    StepFactory.setLengths(conf, 4000, 0, 96, 0, 252);
+    conf.setInt("test.spillmap.records", 1000);
+    conf.setBoolean("test.disable.key.read", true);
+    conf.setBoolean("test.disable.val.read", true);
+    runTest("postspillmeta", job);
+  }
+
+  @Test
+  public void testLargeRecConcurrent() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
+    Job job = Job.getInstance(new Cluster(conf), conf);
+    conf = job.getConfiguration();
+    conf.setInt(JobContext.IO_SORT_MB, 1);
+    conf.set(JobContext.MAP_SORT_SPILL_PERCENT, Float.toString(.986328125f));
+    conf.setClass("test.mapcollection.class", StepFactory.class,
+        RecordFactory.class);
+    StepFactory.setLengths(conf, 4000, 261120, 96, 1024, 251);
+    conf.setInt("test.spillmap.records", 255);
+    conf.setBoolean("test.disable.key.read", false);
+    conf.setBoolean("test.disable.val.read", false);
+    runTest("largeconcurrent", job);
+  }
+
+  public static class RandomFactory extends RecordFactory {
+    public int minkey;
+    public int maxkey;
+    public int minval;
+    public int maxval;
+    private final Random r = new Random();
+    private static int nextRand(Random r, int max) {
+      return (int)Math.exp(r.nextDouble() * Math.log(max));
+    }
+    public void setConf(Configuration conf) {
+      r.setSeed(conf.getLong("test.randomfactory.seed", 0L));
+      minkey = conf.getInt("test.randomfactory.minkey", 0);
+      maxkey = conf.getInt("test.randomfactory.maxkey", 0) - minkey;
+      minval = conf.getInt("test.randomfactory.minval", 0);
+      maxval = conf.getInt("test.randomfactory.maxval", 0) - minval;
+    }
+    public static void setLengths(Configuration conf, Random r, int max) {
+      int k1 = nextRand(r, max);
+      int k2 = nextRand(r, max);
+      if (k1 > k2) {
+        final int tmp = k1;
+        k1 = k2;
+        k2 = k1;
+      }
+      int v1 = nextRand(r, max);
+      int v2 = nextRand(r, max);
+      if (v1 > v2) {
+        final int tmp = v1;
+        v1 = v2;
+        v2 = v1;
+      }
+      setLengths(conf, k1, ++k2, v1, ++v2);
+    }
+    public static void setLengths(Configuration conf, int minkey, int maxkey,
+        int minval, int maxval) {
+      assert minkey < maxkey;
+      assert minval < maxval;
+      conf.setInt("test.randomfactory.minkey", minkey);
+      conf.setInt("test.randomfactory.maxkey", maxkey);
+      conf.setInt("test.randomfactory.minval", minval);
+      conf.setInt("test.randomfactory.maxval", maxval);
+      conf.setBoolean("test.disable.key.read", minkey == 0);
+      conf.setBoolean("test.disable.val.read", minval == 0);
+    }
+    public int keyLen(int i) {
+      return minkey + nextRand(r, maxkey - minkey);
+    }
+    public int valLen(int i) {
+      return minval + nextRand(r, maxval - minval);
+    }
+  }
+
+  @Test
+  public void testRandom() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
+    Job job = Job.getInstance(new Cluster(conf), conf);
+    conf = job.getConfiguration();
+    conf.setInt(JobContext.IO_SORT_MB, 1);
+    conf.setClass("test.mapcollection.class", RandomFactory.class,
+        RecordFactory.class);
+    final Random r = new Random();
+    final long seed = r.nextLong();
+    LOG.info("SEED: " + seed);
+    r.setSeed(seed);
+    conf.set(JobContext.MAP_SORT_SPILL_PERCENT,
+        Float.toString(Math.max(0.1f, r.nextFloat())));
+    RandomFactory.setLengths(conf, r, 1 << 14);
+    conf.setInt("test.spillmap.records", r.nextInt(500));
+    conf.setLong("test.randomfactory.seed", r.nextLong());
+    runTest("random", job);
+  }
+
+  @Test
+  public void testRandomCompress() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
+    Job job = Job.getInstance(new Cluster(conf), conf);
+    conf = job.getConfiguration();
+    conf.setInt(JobContext.IO_SORT_MB, 1);
+    conf.setBoolean(JobContext.MAP_OUTPUT_COMPRESS, true);
+    conf.setClass("test.mapcollection.class", RandomFactory.class,
+        RecordFactory.class);
+    final Random r = new Random();
+    final long seed = r.nextLong();
+    LOG.info("SEED: " + seed);
+    r.setSeed(seed);
+    conf.set(JobContext.MAP_SORT_SPILL_PERCENT,
+        Float.toString(Math.max(0.1f, r.nextFloat())));
+    RandomFactory.setLengths(conf, r, 1 << 14);
+    conf.setInt("test.spillmap.records", r.nextInt(500));
+    conf.setLong("test.randomfactory.seed", r.nextLong());
+    runTest("randomCompress", job);
   }
 
 }



Mime
View raw message