Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 40410 invoked from network); 4 Mar 2011 03:39:12 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:39:12 -0000 Received: (qmail 60530 invoked by uid 500); 4 Mar 2011 03:39:12 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 60477 invoked by uid 500); 4 Mar 2011 03:39:12 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 60329 invoked by uid 99); 4 Mar 2011 03:39:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:39:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:38:47 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C0BFA2388C29; Fri, 4 Mar 2011 03:38:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077079 [2/11] - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/contrib/ src/contrib/gridmix/ src/contrib/gridmix/ivy/ src/contrib/gridmix/src/ src/contrib/gridmix/src/java/ src/contrib/gridmix/src/java/org/ src/contrib/gr... Date: Fri, 04 Mar 2011 03:38:23 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304033824.C0BFA2388C29@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=1077079&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Fri Mar 4 03:38:20 2011 @@ -0,0 +1,523 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Formatter; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.TaskInfo; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Synthetic job generated from a trace description. + */ +class GridmixJob implements Callable, Delayed { + + public static final String JOBNAME = "GRIDMIX"; + public static final String ORIGNAME = "gridmix.job.name.original"; + public static final Log LOG = LogFactory.getLog(GridmixJob.class); + + private static final ThreadLocal nameFormat = + new ThreadLocal() { + @Override + protected Formatter initialValue() { + final StringBuilder sb = new StringBuilder(JOBNAME.length() + 5); + sb.append(JOBNAME); + return new Formatter(sb); + } + }; + + private final int seq; + private final Path outdir; + protected final Job job; + private final JobStory jobdesc; + private final long submissionTimeNanos; + + public GridmixJob(Configuration conf, long submissionMillis, + JobStory jobdesc, Path outRoot, int seq) throws IOException { + ((StringBuilder)nameFormat.get().out()).setLength(JOBNAME.length()); + job = new Job(conf, nameFormat.get().format("%05d", seq).toString()); + submissionTimeNanos = TimeUnit.NANOSECONDS.convert( + submissionMillis, TimeUnit.MILLISECONDS); + this.jobdesc = jobdesc; + this.seq = seq; + outdir = new Path(outRoot, "" + seq); + } + + protected GridmixJob(Configuration conf, long submissionMillis, String name) + throws IOException { + job = new Job(conf, name); + submissionTimeNanos = TimeUnit.NANOSECONDS.convert( + submissionMillis, TimeUnit.MILLISECONDS); + jobdesc = null; + outdir = null; + seq = -1; + } + + public String toString() { + return job.getJobName(); + } + + public long getDelay(TimeUnit unit) { + return unit.convert(submissionTimeNanos - System.nanoTime(), + TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed other) { + if (this == other) { + return 0; + } + if (other instanceof GridmixJob) { + final long otherNanos = ((GridmixJob)other).submissionTimeNanos; + if (otherNanos < submissionTimeNanos) { + return 1; + } + if (otherNanos > submissionTimeNanos) { + return -1; + } + return id() - ((GridmixJob)other).id(); + } + final long diff = + getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS); + return 0 == diff ? 0 : (diff > 0 ? 1 : -1); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + // not possible unless job is cloned; all jobs should be unique + return other instanceof GridmixJob && id() == ((GridmixJob)other).id(); + } + + @Override + public int hashCode() { + return id(); + } + + int id() { + return seq; + } + + Job getJob() { + return job; + } + + JobStory getJobDesc() { + return jobdesc; + } + + public Job call() throws IOException, InterruptedException, + ClassNotFoundException { + job.setMapperClass(GridmixMapper.class); + job.setReducerClass(GridmixReducer.class); + job.setNumReduceTasks(jobdesc.getNumberReduces()); + job.setMapOutputKeyClass(GridmixKey.class); + job.setMapOutputValueClass(GridmixRecord.class); + job.setSortComparatorClass(GridmixKey.Comparator.class); + job.setGroupingComparatorClass(SpecGroupingComparator.class); + job.setInputFormatClass(GridmixInputFormat.class); + job.setOutputFormatClass(RawBytesOutputFormat.class); + job.setPartitionerClass(DraftPartitioner.class); + job.setJarByClass(GridmixJob.class); + job.getConfiguration().setInt("gridmix.job.seq", seq); + job.getConfiguration().set(ORIGNAME, null == jobdesc.getJobID() + ? "" : jobdesc.getJobID().toString()); + job.getConfiguration().setBoolean("mapred.used.genericoptionsparser", true); + FileInputFormat.addInputPath(job, new Path("ignored")); + FileOutputFormat.setOutputPath(job, outdir); + job.submit(); + return job; + } + + public static class DraftPartitioner extends Partitioner { + public int getPartition(GridmixKey key, V value, int numReduceTasks) { + return key.getPartition(); + } + } + + public static class SpecGroupingComparator + implements RawComparator { + private final DataInputBuffer di = new DataInputBuffer(); + private final byte[] reset = di.getData(); + @Override + public int compare(GridmixKey g1, GridmixKey g2) { + final byte t1 = g1.getType(); + final byte t2 = g2.getType(); + if (t1 == GridmixKey.REDUCE_SPEC || + t2 == GridmixKey.REDUCE_SPEC) { + return t1 - t2; + } + assert t1 == GridmixKey.DATA; + assert t2 == GridmixKey.DATA; + return g1.compareTo(g2); + } + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + try { + final int ret; + di.reset(b1, s1, l1); + final int x1 = WritableUtils.readVInt(di); + di.reset(b2, s2, l2); + final int x2 = WritableUtils.readVInt(di); + final int t1 = b1[s1 + x1]; + final int t2 = b2[s2 + x2]; + if (t1 == GridmixKey.REDUCE_SPEC || + t2 == GridmixKey.REDUCE_SPEC) { + ret = t1 - t2; + } else { + assert t1 == GridmixKey.DATA; + assert t2 == GridmixKey.DATA; + ret = + WritableComparator.compareBytes(b1, s1, x1, b2, s2, x2); + } + di.reset(reset, 0, 0); + return ret; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class GridmixMapper + extends Mapper { + + private double acc; + private double ratio; + private final ArrayList reduces = + new ArrayList(); + private final Random r = new Random(); + + private final GridmixKey key = new GridmixKey(); + private final GridmixRecord val = new GridmixRecord(); + + @Override + protected void setup(Context ctxt) + throws IOException, InterruptedException { + final Configuration conf = ctxt.getConfiguration(); + final GridmixSplit split = (GridmixSplit) ctxt.getInputSplit(); + final int maps = split.getMapCount(); + final long[] reduceBytes = split.getOutputBytes(); + final long[] reduceRecords = split.getOutputRecords(); + + long totalRecords = 0L; + final int nReduces = ctxt.getNumReduceTasks(); + if (nReduces > 0) { + int idx = 0; + int id = split.getId(); + for (int i = 0; i < nReduces; ++i) { + final GridmixKey.Spec spec = new GridmixKey.Spec(); + if (i == id) { + spec.bytes_out = split.getReduceBytes(idx); + spec.rec_out = split.getReduceRecords(idx); + ++idx; + id += maps; + } + reduces.add(new IntermediateRecordFactory( + new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf), + i, reduceRecords[i], spec, conf)); + totalRecords += reduceRecords[i]; + } + } else { + reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0], + conf)); + totalRecords = reduceRecords[0]; + } + final long splitRecords = split.getInputRecords(); + final long inputRecords = splitRecords <= 0 && split.getLength() >= 0 + ? Math.max(1, + split.getLength() / conf.getInt("gridmix.missing.rec.size", 64*1024)) + : splitRecords; + ratio = totalRecords / (1.0 * inputRecords); + acc = 0.0; + } + + @Override + public void map(NullWritable ignored, GridmixRecord rec, + Context context) throws IOException, InterruptedException { + acc += ratio; + while (acc >= 1.0 && !reduces.isEmpty()) { + key.setSeed(r.nextLong()); + val.setSeed(r.nextLong()); + final int idx = r.nextInt(reduces.size()); + final RecordFactory f = reduces.get(idx); + if (!f.next(key, val)) { + reduces.remove(idx); + continue; + } + context.write(key, val); + acc -= 1.0; + } + } + + @Override + public void cleanup(Context context) + throws IOException, InterruptedException { + for (RecordFactory factory : reduces) { + key.setSeed(r.nextLong()); + while (factory.next(key, val)) { + context.write(key, val); + key.setSeed(r.nextLong()); + } + } + } + } + + public static class GridmixReducer + extends Reducer { + + private final Random r = new Random(); + private final GridmixRecord val = new GridmixRecord(); + + private double acc; + private double ratio; + private RecordFactory factory; + + @Override + protected void setup(Context context) + throws IOException, InterruptedException { + if (!context.nextKey() || + context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) { + throw new IOException("Missing reduce spec"); + } + long outBytes = 0L; + long outRecords = 0L; + long inRecords = 0L; + for (GridmixRecord ignored : context.getValues()) { + final GridmixKey spec = context.getCurrentKey(); + inRecords += spec.getReduceInputRecords(); + outBytes += spec.getReduceOutputBytes(); + outRecords += spec.getReduceOutputRecords(); + } + if (0 == outRecords && inRecords > 0) { + LOG.info("Spec output bytes w/o records. Using input record count"); + outRecords = inRecords; + } + factory = + new AvgRecordFactory(outBytes, outRecords, context.getConfiguration()); + ratio = outRecords / (1.0 * inRecords); + acc = 0.0; + } + @Override + protected void reduce(GridmixKey key, Iterable values, + Context context) throws IOException, InterruptedException { + for (GridmixRecord ignored : values) { + acc += ratio; + while (acc >= 1.0 && factory.next(null, val)) { + context.write(NullWritable.get(), val); + acc -= 1.0; + } + } + } + @Override + protected void cleanup(Context context) + throws IOException, InterruptedException { + val.setSeed(r.nextLong()); + while (factory.next(null, val)) { + context.write(NullWritable.get(), val); + val.setSeed(r.nextLong()); + } + } + } + + static class GridmixRecordReader + extends RecordReader { + + private RecordFactory factory; + private final Random r = new Random(); + private final GridmixRecord val = new GridmixRecord(); + + public GridmixRecordReader() { } + + @Override + public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt) + throws IOException, InterruptedException { + final GridmixSplit split = (GridmixSplit)genericSplit; + final Configuration conf = ctxt.getConfiguration(); + factory = new ReadRecordFactory(split.getLength(), + split.getInputRecords(), new FileQueue(split, conf), conf); + } + + @Override + public boolean nextKeyValue() throws IOException { + val.setSeed(r.nextLong()); + return factory.next(null, val); + } + @Override + public float getProgress() throws IOException { + return factory.getProgress(); + } + @Override + public NullWritable getCurrentKey() { + return NullWritable.get(); + } + @Override + public GridmixRecord getCurrentValue() { + return val; + } + @Override + public void close() throws IOException { + factory.close(); + } + } + + static class GridmixInputFormat + extends InputFormat { + + @Override + public List getSplits(JobContext jobCtxt) throws IOException { + return pullDescription(jobCtxt.getConfiguration().getInt( + "gridmix.job.seq", -1)); + } + @Override + public RecordReader createRecordReader( + InputSplit split, final TaskAttemptContext taskContext) + throws IOException { + return new GridmixRecordReader(); + } + } + + static class RawBytesOutputFormat + extends FileOutputFormat { + + @Override + public RecordWriter getRecordWriter( + TaskAttemptContext job) throws IOException { + + Path file = getDefaultWorkFile(job, ""); + FileSystem fs = file.getFileSystem(job.getConfiguration()); + final FSDataOutputStream fileOut = fs.create(file, false); + return new RecordWriter() { + @Override + public void write(K ignored, GridmixRecord value) + throws IOException { + value.writeRandom(fileOut, value.getSize()); + } + @Override + public void close(TaskAttemptContext ctxt) throws IOException { + fileOut.close(); + } + }; + } + } + + // TODO replace with ThreadLocal submitter? + private static final ConcurrentHashMap> descCache = + new ConcurrentHashMap>(); + + static void pushDescription(int seq, List splits) { + if (null != descCache.putIfAbsent(seq, splits)) { + throw new IllegalArgumentException("Description exists for id " + seq); + } + } + + static List pullDescription(int seq) { + return descCache.remove(seq); + } + + // not nesc when TL + static void clearAll() { + descCache.clear(); + } + + void buildSplits(FilePool inputDir) throws IOException { + long mapInputBytesTotal = 0L; + long mapOutputBytesTotal = 0L; + long mapOutputRecordsTotal = 0L; + final JobStory jobdesc = getJobDesc(); + if (null == jobdesc) { + return; + } + final int maps = jobdesc.getNumberMaps(); + final int reds = jobdesc.getNumberReduces(); + for (int i = 0; i < maps; ++i) { + final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i); + mapInputBytesTotal += info.getInputBytes(); + mapOutputBytesTotal += info.getOutputBytes(); + mapOutputRecordsTotal += info.getOutputRecords(); + } + final double[] reduceRecordRatio = new double[reds]; + final double[] reduceByteRatio = new double[reds]; + for (int i = 0; i < reds; ++i) { + final TaskInfo info = jobdesc.getTaskInfo(TaskType.REDUCE, i); + reduceByteRatio[i] = info.getInputBytes() / (1.0 * mapOutputBytesTotal); + reduceRecordRatio[i] = + info.getInputRecords() / (1.0 * mapOutputRecordsTotal); + } + final InputStriper striper = new InputStriper(inputDir, mapInputBytesTotal); + final List splits = new ArrayList(); + for (int i = 0; i < maps; ++i) { + final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0); + final long[] specBytes = new long[nSpec]; + final long[] specRecords = new long[nSpec]; + for (int j = 0; j < nSpec; ++j) { + final TaskInfo info = + jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps); + specBytes[j] = info.getOutputBytes(); + specRecords[j] = info.getOutputRecords(); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i, + i + j * maps, info.getOutputRecords(), info.getOutputBytes())); + } + } + final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i); + splits.add(new GridmixSplit(striper.splitFor(inputDir, + info.getInputBytes(), 3), maps, i, + info.getInputBytes(), info.getInputRecords(), + info.getOutputBytes(), info.getOutputRecords(), + reduceByteRatio, reduceRecordRatio, specBytes, specRecords)); + } + pushDescription(id(), splits); + } + +} Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java?rev=1077079&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java Fri Mar 4 03:38:20 2011 @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.WritableComparator; + +class GridmixKey extends GridmixRecord { + static final byte REDUCE_SPEC = 0; + static final byte DATA = 1; + + static final int META_BYTES = 1; + + private byte type; + private int partition; // NOT serialized + private Spec spec = new Spec(); + + GridmixKey() { + this(DATA, 1, 0L); + } + GridmixKey(byte type, int size, long seed) { + super(size, seed); + this.type = type; + // setting type may change pcnt random bytes + setSize(size); + } + + @Override + public int getSize() { + switch (type) { + case REDUCE_SPEC: + return super.getSize() + spec.getSize() + META_BYTES; + case DATA: + return super.getSize() + META_BYTES; + default: + throw new IllegalStateException("Invalid type: " + type); + } + } + + @Override + public void setSize(int size) { + switch (type) { + case REDUCE_SPEC: + super.setSize(size - (META_BYTES + spec.getSize())); + break; + case DATA: + super.setSize(size - META_BYTES); + break; + default: + throw new IllegalStateException("Invalid type: " + type); + } + } + + /** + * Partition is not serialized. + */ + public int getPartition() { + return partition; + } + public void setPartition(int partition) { + this.partition = partition; + } + + public long getReduceInputRecords() { + assert REDUCE_SPEC == getType(); + return spec.rec_in; + } + public void setReduceInputRecords(long rec_in) { + assert REDUCE_SPEC == getType(); + final int origSize = getSize(); + spec.rec_in = rec_in; + setSize(origSize); + } + + public long getReduceOutputRecords() { + assert REDUCE_SPEC == getType(); + return spec.rec_out; + } + public void setReduceOutputRecords(long rec_out) { + assert REDUCE_SPEC == getType(); + final int origSize = getSize(); + spec.rec_out = rec_out; + setSize(origSize); + } + + public long getReduceOutputBytes() { + assert REDUCE_SPEC == getType(); + return spec.bytes_out; + }; + public void setReduceOutputBytes(long b_out) { + assert REDUCE_SPEC == getType(); + final int origSize = getSize(); + spec.bytes_out = b_out; + setSize(origSize); + } + + public byte getType() { + return type; + } + public void setType(byte type) throws IOException { + final int origSize = getSize(); + switch (type) { + case REDUCE_SPEC: + case DATA: + this.type = type; + break; + default: + throw new IOException("Invalid type: " + type); + } + setSize(origSize); + } + + public void setSpec(Spec spec) { + assert REDUCE_SPEC == getType(); + final int origSize = getSize(); + this.spec.set(spec); + setSize(origSize); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + setType(in.readByte()); + if (REDUCE_SPEC == getType()) { + spec.readFields(in); + } + } + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + final byte t = getType(); + out.writeByte(t); + if (REDUCE_SPEC == t) { + spec.write(out); + } + } + int fixedBytes() { + return super.fixedBytes() + + (REDUCE_SPEC == getType() ? spec.getSize() : 0) + META_BYTES; + } + @Override + public int compareTo(GridmixRecord other) { + final GridmixKey o = (GridmixKey) other; + final byte t1 = getType(); + final byte t2 = o.getType(); + if (t1 != t2) { + return t1 - t2; + } + return super.compareTo(other); + } + + /** + * Note that while the spec is not explicitly included, changing the spec + * may change its size, which will affect equality. + */ + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other != null && other.getClass() == getClass()) { + final GridmixKey o = ((GridmixKey)other); + return getType() == o.getType() && super.equals(o); + } + return false; + } + + @Override + public int hashCode() { + return super.hashCode() ^ getType(); + } + + public static class Spec implements Writable { + long rec_in; + long rec_out; + long bytes_out; + public Spec() { } + + public void set(Spec other) { + rec_in = other.rec_in; + bytes_out = other.bytes_out; + rec_out = other.rec_out; + } + + public int getSize() { + return WritableUtils.getVIntSize(rec_in) + + WritableUtils.getVIntSize(rec_out) + + WritableUtils.getVIntSize(bytes_out); + } + + @Override + public void readFields(DataInput in) throws IOException { + rec_in = WritableUtils.readVLong(in); + rec_out = WritableUtils.readVLong(in); + bytes_out = WritableUtils.readVLong(in); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeVLong(out, rec_in); + WritableUtils.writeVLong(out, rec_out); + WritableUtils.writeVLong(out, bytes_out); + } + } + + public static class Comparator extends GridmixRecord.Comparator { + + private final DataInputBuffer di = new DataInputBuffer(); + private final byte[] reset = di.getData(); + + public Comparator() { + super(GridmixKey.class); + } + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + try { + di.reset(b1, s1, l1); + final int x1 = WritableUtils.readVInt(di); + di.reset(b2, s2, l2); + final int x2 = WritableUtils.readVInt(di); + final int ret = (b1[s1 + x1] != b2[s2 + x2]) + ? b1[s1 + x1] - b2[s2 + x2] + : super.compare(b1, s1, x1, b2, s2, x2); + di.reset(reset, 0, 0); + return ret; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + static { + WritableComparator.define(GridmixKey.class, new Comparator()); + } + } +} + Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java?rev=1077079&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java Fri Mar 4 03:38:20 2011 @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; + +class GridmixRecord implements WritableComparable { + + private static final int FIXED_BYTES = 1; + private int size = -1; + private long seed; + private final DataInputBuffer dib = + new DataInputBuffer(); + private final DataOutputBuffer dob = + new DataOutputBuffer(Long.SIZE / Byte.SIZE); + private byte[] literal = dob.getData(); + + GridmixRecord() { + this(1, 0L); + } + + GridmixRecord(int size, long seed) { + this.seed = seed; + setSizeInternal(size); + } + + public int getSize() { + return size; + } + + public void setSize(int size) { + setSizeInternal(size); + } + + private void setSizeInternal(int size) { + this.size = Math.max(1, size); + try { + seed = maskSeed(seed, this.size); + dob.reset(); + dob.writeLong(seed); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public final void setSeed(long seed) { + this.seed = seed; + } + + /** Marsaglia, 2003. */ + long nextRand(long x) { + x ^= (x << 13); + x ^= (x >>> 7); + return (x ^= (x << 17)); + } + + public void writeRandom(DataOutput out, final int size) throws IOException { + long tmp = seed; + out.writeLong(tmp); + int i = size - (Long.SIZE / Byte.SIZE); + while (i > Long.SIZE / Byte.SIZE - 1) { + tmp = nextRand(tmp); + out.writeLong(tmp); + i -= Long.SIZE / Byte.SIZE; + } + for (tmp = nextRand(tmp); i > 0; --i) { + out.writeByte((int)(tmp & 0xFF)); + tmp >>>= Byte.SIZE; + } + } + + @Override + public void readFields(DataInput in) throws IOException { + size = WritableUtils.readVInt(in); + int payload = size - WritableUtils.getVIntSize(size); + if (payload > Long.SIZE / Byte.SIZE) { + seed = in.readLong(); + payload -= Long.SIZE / Byte.SIZE; + } else { + Arrays.fill(literal, (byte)0); + in.readFully(literal, 0, payload); + dib.reset(literal, 0, literal.length); + seed = dib.readLong(); + payload = 0; + } + final int vBytes = in.skipBytes(payload); + if (vBytes != payload) { + throw new EOFException("Expected " + payload + ", read " + vBytes); + } + } + + @Override + public void write(DataOutput out) throws IOException { + // data bytes including vint encoding + WritableUtils.writeVInt(out, size); + final int payload = size - WritableUtils.getVIntSize(size); + if (payload > Long.SIZE / Byte.SIZE) { + writeRandom(out, payload); + } else if (payload > 0) { + out.write(literal, 0, payload); + } + } + + @Override + public int compareTo(GridmixRecord other) { + return compareSeed(other.seed, + Math.max(0, other.getSize() - other.fixedBytes())); + } + + int fixedBytes() { + // min vint size + return FIXED_BYTES; + } + + private static long maskSeed(long sd, int sz) { + // Don't use fixedBytes here; subclasses will set intended random len + if (sz <= FIXED_BYTES) { + sd = 0L; + } else if (sz < Long.SIZE / Byte.SIZE + FIXED_BYTES) { + final int tmp = sz - FIXED_BYTES; + final long mask = (1L << (Byte.SIZE * tmp)) - 1; + sd &= mask << (Byte.SIZE * (Long.SIZE / Byte.SIZE - tmp)); + } + return sd; + } + + int compareSeed(long jSeed, int jSize) { + final int iSize = Math.max(0, getSize() - fixedBytes()); + final int seedLen = Math.min(iSize, jSize) + FIXED_BYTES; + jSeed = maskSeed(jSeed, seedLen); + long iSeed = maskSeed(seed, seedLen); + final int cmplen = Math.min(iSize, jSize); + for (int i = 0; i < cmplen; i += Byte.SIZE) { + final int k = cmplen - i; + for (long j = Long.SIZE - Byte.SIZE; + j >= Math.max(0, Long.SIZE / Byte.SIZE - k) * Byte.SIZE; + j -= Byte.SIZE) { + final int xi = (int)((iSeed >>> j) & 0xFFL); + final int xj = (int)((jSeed >>> j) & 0xFFL); + if (xi != xj) { + return xi - xj; + } + } + iSeed = nextRand(iSeed); + jSeed = nextRand(jSeed); + } + return iSize - jSize; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other != null && other.getClass() == getClass()) { + final GridmixRecord o = ((GridmixRecord)other); + return getSize() == o.getSize() && seed == o.seed; + } + return false; + } + + @Override + public int hashCode() { + return (int)(seed * getSize()); + } + + public static class Comparator extends WritableComparator { + + public Comparator() { + super(GridmixRecord.class); + } + + public Comparator(Class> sub) { + super(sub); + } + + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int n1 = WritableUtils.decodeVIntSize(b1[s1]); + int n2 = WritableUtils.decodeVIntSize(b2[s2]); + n1 -= WritableUtils.getVIntSize(n1); + n2 -= WritableUtils.getVIntSize(n2); + return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2); + } + + static { + WritableComparator.define(GridmixRecord.class, new Comparator()); + } + } + +} Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java?rev=1077079&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java Fri Mar 4 03:38:20 2011 @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableUtils; + +class GridmixSplit extends CombineFileSplit { + private int id; + private int nSpec; + private int maps; + private int reduces; + private long inputRecords; + private long outputBytes; + private long outputRecords; + private long maxMemory; + private double[] reduceBytes = new double[0]; + private double[] reduceRecords = new double[0]; + + // Spec for reduces id mod this + private long[] reduceOutputBytes = new long[0]; + private long[] reduceOutputRecords = new long[0]; + + GridmixSplit() { + super(); + } + + public GridmixSplit(CombineFileSplit cfsplit, int maps, int id, + long inputBytes, long inputRecords, long outputBytes, + long outputRecords, double[] reduceBytes, double[] reduceRecords, + long[] reduceOutputBytes, long[] reduceOutputRecords) + throws IOException { + super(cfsplit); + this.id = id; + this.maps = maps; + reduces = reduceBytes.length; + this.inputRecords = inputRecords; + this.outputBytes = outputBytes; + this.outputRecords = outputRecords; + this.reduceBytes = reduceBytes; + this.reduceRecords = reduceRecords; + nSpec = reduceOutputBytes.length; + this.reduceOutputBytes = reduceOutputBytes; + this.reduceOutputRecords = reduceOutputRecords; + } + public int getId() { + return id; + } + public int getMapCount() { + return maps; + } + public long getInputRecords() { + return inputRecords; + } + public long[] getOutputBytes() { + if (0 == reduces) { + return new long[] { outputBytes }; + } + final long[] ret = new long[reduces]; + for (int i = 0; i < reduces; ++i) { + ret[i] = Math.round(outputBytes * reduceBytes[i]); + } + return ret; + } + public long[] getOutputRecords() { + if (0 == reduces) { + return new long[] { outputRecords }; + } + final long[] ret = new long[reduces]; + for (int i = 0; i < reduces; ++i) { + ret[i] = Math.round(outputRecords * reduceRecords[i]); + } + return ret; + } + public long getReduceBytes(int i) { + return reduceOutputBytes[i]; + } + public long getReduceRecords(int i) { + return reduceOutputRecords[i]; + } + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + WritableUtils.writeVInt(out, id); + WritableUtils.writeVInt(out, maps); + WritableUtils.writeVLong(out, inputRecords); + WritableUtils.writeVLong(out, outputBytes); + WritableUtils.writeVLong(out, outputRecords); + WritableUtils.writeVLong(out, maxMemory); + WritableUtils.writeVInt(out, reduces); + for (int i = 0; i < reduces; ++i) { + out.writeDouble(reduceBytes[i]); + out.writeDouble(reduceRecords[i]); + } + WritableUtils.writeVInt(out, nSpec); + for (int i = 0; i < nSpec; ++i) { + WritableUtils.writeVLong(out, reduceOutputBytes[i]); + WritableUtils.writeVLong(out, reduceOutputRecords[i]); + } + } + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + id = WritableUtils.readVInt(in); + maps = WritableUtils.readVInt(in); + inputRecords = WritableUtils.readVLong(in); + outputBytes = WritableUtils.readVLong(in); + outputRecords = WritableUtils.readVLong(in); + maxMemory = WritableUtils.readVLong(in); + reduces = WritableUtils.readVInt(in); + if (reduceBytes.length < reduces) { + reduceBytes = new double[reduces]; + reduceRecords = new double[reduces]; + } + for (int i = 0; i < reduces; ++i) { + reduceBytes[i] = in.readDouble(); + reduceRecords[i] = in.readDouble(); + } + nSpec = WritableUtils.readVInt(in); + if (reduceOutputBytes.length < nSpec) { + reduceOutputBytes = new long[nSpec]; + reduceOutputRecords = new long[nSpec]; + } + for (int i = 0; i < nSpec; ++i) { + reduceOutputBytes[i] = WritableUtils.readVLong(in); + reduceOutputRecords[i] = WritableUtils.readVLong(in); + } + } +} Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java?rev=1077079&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java Fri Mar 4 03:38:20 2011 @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Given a {@link #FilePool}, obtain a set of files capable of satisfying + * a full set of splits, then iterate over each source to fill the request. + */ +class InputStriper { + public static final Log LOG = LogFactory.getLog(InputStriper.class); + int idx; + long currentStart; + FileStatus current; + final List files = new ArrayList(); + + /** + * @param inputDir Pool from which files are requested. + * @param mapBytes Sum of all expected split requests. + */ + InputStriper(FilePool inputDir, long mapBytes) + throws IOException { + final long inputBytes = inputDir.getInputFiles(mapBytes, files); + if (mapBytes > inputBytes) { + LOG.warn("Using " + inputBytes + "/" + mapBytes + " bytes"); + } + if (files.isEmpty() && mapBytes > 0) { + throw new IOException("Failed to satisfy request for " + mapBytes); + } + current = files.isEmpty() ? null : files.get(0); + } + + /** + * @param inputDir Pool used to resolve block locations. + * @param bytes Target byte count + * @param nLocs Number of block locations per split. + * @return A set of files satisfying the byte count, with locations weighted + * to the dominating proportion of input bytes. + */ + CombineFileSplit splitFor(FilePool inputDir, long bytes, int nLocs) + throws IOException { + final ArrayList paths = new ArrayList(); + final ArrayList start = new ArrayList(); + final ArrayList length = new ArrayList(); + final HashMap sb = new HashMap(); + do { + paths.add(current.getPath()); + start.add(currentStart); + final long fromFile = Math.min(bytes, current.getLen() - currentStart); + length.add(fromFile); + for (BlockLocation loc : + inputDir.locationsFor(current, currentStart, fromFile)) { + final double tedium = loc.getLength() / (1.0 * bytes); + for (String l : loc.getHosts()) { + Double j = sb.get(l); + if (null == j) { + sb.put(l, tedium); + } else { + sb.put(l, j.doubleValue() + tedium); + } + } + } + currentStart += fromFile; + bytes -= fromFile; + if (current.getLen() - currentStart == 0) { + current = files.get(++idx % files.size()); + currentStart = 0; + } + } while (bytes > 0); + final ArrayList> sort = + new ArrayList>(sb.entrySet()); + Collections.sort(sort, hostRank); + final String[] hosts = new String[Math.min(nLocs, sort.size())]; + for (int i = 0; i < nLocs && i < sort.size(); ++i) { + hosts[i] = sort.get(i).getKey(); + } + return new CombineFileSplit(paths.toArray(new Path[0]), + toLongArray(start), toLongArray(length), hosts); + } + + private long[] toLongArray(final ArrayList sigh) { + final long[] ret = new long[sigh.size()]; + for (int i = 0; i < ret.length; ++i) { + ret[i] = sigh.get(i); + } + return ret; + } + + static final Comparator> hostRank = + new Comparator>() { + public int compare(Entry a, Entry b) { + final double va = a.getValue(); + final double vb = b.getValue(); + return va > vb ? -1 : va < vb ? 1 : 0; + } + }; +} Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java?rev=1077079&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java Fri Mar 4 03:38:20 2011 @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +/** + * Factory passing reduce specification as its last record. + */ +class IntermediateRecordFactory extends RecordFactory { + + private final GridmixKey.Spec spec; + private final RecordFactory factory; + private final int partition; + private final long targetRecords; + private boolean done = false; + private long accRecords = 0L; + + /** + * @param targetBytes Expected byte count. + * @param targetRecords Expected record count; will emit spec records after + * this boundary is passed. + * @param partition Reduce to which records are emitted. + * @param spec Specification to emit. + * @param conf Unused. + */ + public IntermediateRecordFactory(long targetBytes, long targetRecords, + int partition, GridmixKey.Spec spec, Configuration conf) { + this(new AvgRecordFactory(targetBytes, targetRecords, conf), partition, + targetRecords, spec, conf); + } + + /** + * @param factory Factory from which byte/record counts are obtained. + * @param partition Reduce to which records are emitted. + * @param targetRecords Expected record count; will emit spec records after + * this boundary is passed. + * @param spec Specification to emit. + * @param conf Unused. + */ + public IntermediateRecordFactory(RecordFactory factory, int partition, + long targetRecords, GridmixKey.Spec spec, Configuration conf) { + this.spec = spec; + this.factory = factory; + this.partition = partition; + this.targetRecords = targetRecords; + } + + @Override + public boolean next(GridmixKey key, GridmixRecord val) throws IOException { + assert key != null; + final boolean rslt = factory.next(key, val); + ++accRecords; + if (rslt) { + if (accRecords < targetRecords) { + key.setType(GridmixKey.DATA); + } else { + final int orig = key.getSize(); + key.setType(GridmixKey.REDUCE_SPEC); + spec.rec_in = accRecords; + key.setSpec(spec); + val.setSize(val.getSize() - (key.getSize() - orig)); + // reset counters + accRecords = 0L; + spec.bytes_out = 0L; + spec.rec_out = 0L; + done = true; + } + } else if (!done) { + // ensure spec emitted + key.setType(GridmixKey.REDUCE_SPEC); + key.setPartition(partition); + key.setSize(0); + val.setSize(0); + spec.rec_in = 0L; + key.setSpec(spec); + done = true; + return true; + } + key.setPartition(partition); + return rslt; + } + + @Override + public float getProgress() throws IOException { + return factory.getProgress(); + } + + @Override + public void close() throws IOException { + factory.close(); + } +} Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=1077079&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Fri Mar 4 03:38:20 2011 @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; +import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants; +import org.apache.hadoop.tools.rumen.TaskAttemptInfo; +import org.apache.hadoop.tools.rumen.TaskInfo; +import org.apache.hadoop.tools.rumen.ZombieJobProducer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + + +/** + * Component reading job traces generated by Rumen. Each job in the trace is + * assigned a sequence number and given a submission time relative to the + * job that preceded it. Jobs are enqueued in the JobSubmitter provided at + * construction. + * @see org.apache.hadoop.tools.rumen.HadoopLogsAnalyzer + */ +class JobFactory implements Gridmix.Component { + + public static final Log LOG = LogFactory.getLog(JobFactory.class); + + private final Path scratch; + private final float rateFactor; + private final Configuration conf; + private final ReaderThread rThread; + private final AtomicInteger sequence; + private final JobSubmitter submitter; + private final CountDownLatch startFlag; + private volatile IOException error = null; + protected final JobStoryProducer jobProducer; + + /** + * Creating a new instance does not start the thread. + * @param submitter Component to which deserialized jobs are passed + * @param jobTrace Stream of job traces with which to construct a + * {@link org.apache.hadoop.tools.rumen.ZombieJobProducer} + * @param scratch Directory into which to write output from simulated jobs + * @param conf Config passed to all jobs to be submitted + * @param startFlag Latch released from main to start pipeline + */ + public JobFactory(JobSubmitter submitter, InputStream jobTrace, + Path scratch, Configuration conf, CountDownLatch startFlag) + throws IOException { + this(submitter, new ZombieJobProducer(jobTrace, null), scratch, conf, + startFlag); + } + + /** + * Constructor permitting JobStoryProducer to be mocked. + * @param submitter Component to which deserialized jobs are passed + * @param jobProducer Producer generating JobStory objects. + * @param scratch Directory into which to write output from simulated jobs + * @param conf Config passed to all jobs to be submitted + * @param startFlag Latch released from main to start pipeline + */ + protected JobFactory(JobSubmitter submitter, JobStoryProducer jobProducer, + Path scratch, Configuration conf, CountDownLatch startFlag) { + sequence = new AtomicInteger(0); + this.scratch = scratch; + this.rateFactor = conf.getFloat(Gridmix.GRIDMIX_SUB_MUL, 1.0f); + this.jobProducer = jobProducer; + this.conf = new Configuration(conf); + this.submitter = submitter; + this.startFlag = startFlag; + this.rThread = new ReaderThread(); + } + + static class MinTaskInfo extends TaskInfo { + public MinTaskInfo(TaskInfo info) { + super(info.getInputBytes(), info.getInputRecords(), + info.getOutputBytes(), info.getOutputRecords(), + info.getTaskMemory()); + } + public long getInputBytes() { + return Math.max(0, super.getInputBytes()); + } + public int getInputRecords() { + return Math.max(0, super.getInputRecords()); + } + public long getOutputBytes() { + return Math.max(0, super.getOutputBytes()); + } + public int getOutputRecords() { + return Math.max(0, super.getOutputRecords()); + } + public long getTaskMemory() { + return Math.max(0, super.getTaskMemory()); + } + } + + static class FilterJobStory implements JobStory { + + protected final JobStory job; + + public FilterJobStory(JobStory job) { + this.job = job; + } + public JobConf getJobConf() { return job.getJobConf(); } + public String getName() { return job.getName(); } + public JobID getJobID() { return job.getJobID(); } + public String getUser() { return job.getUser(); } + public long getSubmissionTime() { return job.getSubmissionTime(); } + public InputSplit[] getInputSplits() { return job.getInputSplits(); } + public int getNumberMaps() { return job.getNumberMaps(); } + public int getNumberReduces() { return job.getNumberReduces(); } + public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { + return job.getTaskInfo(taskType, taskNumber); + } + public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber, + int taskAttemptNumber) { + return job.getTaskAttemptInfo(taskType, taskNumber, taskAttemptNumber); + } + public TaskAttemptInfo getMapTaskAttemptInfoAdjusted( + int taskNumber, int taskAttemptNumber, int locality) { + return job.getMapTaskAttemptInfoAdjusted( + taskNumber, taskAttemptNumber, locality); + } + public Values getOutcome() { + return job.getOutcome(); + } + } + + /** + * Worker thread responsible for reading descriptions, assigning sequence + * numbers, and normalizing time. + */ + private class ReaderThread extends Thread { + + public ReaderThread() { + super("GridmixJobFactory"); + } + + private JobStory getNextJobFiltered() throws IOException { + JobStory job; + do { + job = jobProducer.getNextJob(); + } while (job != null + && (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS || + job.getSubmissionTime() < 0)); + return null == job ? null : new FilterJobStory(job) { + @Override + public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { + return new MinTaskInfo(this.job.getTaskInfo(taskType, taskNumber)); + } + }; + } + + @Override + public void run() { + try { + startFlag.await(); + if (Thread.currentThread().isInterrupted()) { + return; + } + final long initTime = TimeUnit.MILLISECONDS.convert( + System.nanoTime(), TimeUnit.NANOSECONDS); + LOG.debug("START @ " + initTime); + long first = -1; + long last = -1; + while (!Thread.currentThread().isInterrupted()) { + try { + final JobStory job = getNextJobFiltered(); + if (null == job) { + return; + } + if (first < 0) { + first = job.getSubmissionTime(); + } + final long current = job.getSubmissionTime(); + if (current < last) { + LOG.warn("Job " + job.getJobID() + " out of order"); + continue; + } + last = current; + submitter.add(new GridmixJob(conf, initTime + + Math.round(rateFactor * (current - first)), + job, scratch, sequence.getAndIncrement())); + } catch (IOException e) { + JobFactory.this.error = e; + return; + } + } + } catch (InterruptedException e) { + // exit thread; ignore any jobs remaining in the trace + return; + } finally { + IOUtils.cleanup(null, jobProducer); + } + } + } + + /** + * Obtain the error that caused the thread to exit unexpectedly. + */ + public IOException error() { + return error; + } + + /** + * Add is disabled. + * @throws UnsupportedOperationException + */ + public void add(Void ignored) { + throw new UnsupportedOperationException(getClass().getName() + + " is at the start of the pipeline and accepts no events"); + } + + /** + * Start the reader thread, wait for latch if necessary. + */ + public void start() { + rThread.start(); + } + + /** + * Wait for the reader thread to exhaust the job trace. + */ + public void join(long millis) throws InterruptedException { + rThread.join(millis); + } + + /** + * Interrupt the reader thread. + */ + public void shutdown() { + rThread.interrupt(); + } + + /** + * Interrupt the reader thread. This requires no special consideration, as + * the thread has no pending work queue. + */ + public void abort() { + // Currently no special work + rThread.interrupt(); + } + +} Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=1077079&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java Fri Mar 4 03:38:20 2011 @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.mapreduce.Job; + +/** + * Component accepting submitted, running jobs and responsible for + * monitoring jobs for success and failure. Once a job is submitted, it is + * polled for status until complete. If a job is complete, then the monitor + * thread returns immediately to the queue. If not, the monitor will sleep + * for some duration. + */ +class JobMonitor implements Gridmix.Component { + + public static final Log LOG = LogFactory.getLog(JobMonitor.class); + + private final Queue mJobs; + private final MonitorThread mThread; + private final BlockingQueue runningJobs; + private final long pollDelayMillis; + private boolean graceful = false; + private boolean shutdown = false; + + /** + * Create a JobMonitor with a default polling interval of 5s. + */ + public JobMonitor() { + this(5, TimeUnit.SECONDS); + } + + /** + * Create a JobMonitor that sleeps for the specified duration after + * polling a still-running job. + * @param pollDelay Delay after polling a running job + * @param unit Time unit for pollDelaySec (rounded to milliseconds) + */ + public JobMonitor(int pollDelay, TimeUnit unit) { + mThread = new MonitorThread(); + runningJobs = new LinkedBlockingQueue(); + mJobs = new LinkedList(); + this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit); + } + + /** + * Add a job to the polling queue. + */ + public void add(Job job) throws InterruptedException { + runningJobs.put(job); + } + + /** + * Temporary hook for recording job success. + */ + protected void onSuccess(Job job) { + LOG.info(job.getJobName() + " (" + job.getJobID() + ")" + " success"); + } + + /** + * Temporary hook for recording job failure. + */ + protected void onFailure(Job job) { + LOG.info(job.getJobName() + " (" + job.getJobID() + ")" + " failure"); + } + + /** + * If shutdown before all jobs have completed, any still-running jobs + * may be extracted from the component. + * @throws IllegalStateException If monitoring thread is still running. + * @return Any jobs submitted and not known to have completed. + */ + List getRemainingJobs() { + if (mThread.isAlive()) { + LOG.warn("Internal error: Polling running monitor for jobs"); + } + synchronized (mJobs) { + return new ArrayList(mJobs); + } + } + + /** + * Monitoring thread pulling running jobs from the component and into + * a queue to be polled for status. + */ + private class MonitorThread extends Thread { + + public MonitorThread() { + super("GridmixJobMonitor"); + } + + /** + * Check a job for success or failure. + */ + public void process(Job job) throws IOException, InterruptedException { + if (job.isSuccessful()) { + onSuccess(job); + } else { + onFailure(job); + } + } + + @Override + public void run() { + boolean graceful; + boolean shutdown; + while (true) { + try { + synchronized (mJobs) { + graceful = JobMonitor.this.graceful; + shutdown = JobMonitor.this.shutdown; + runningJobs.drainTo(mJobs); + } + + // shutdown conditions; either shutdown requested and all jobs + // have completed or abort requested and there are recently + // submitted jobs not in the monitored set + if (shutdown) { + if (!graceful) { + while (!runningJobs.isEmpty()) { + synchronized (mJobs) { + runningJobs.drainTo(mJobs); + } + } + break; + } else if (mJobs.isEmpty()) { + break; + } + } + while (!mJobs.isEmpty()) { + Job job; + synchronized (mJobs) { + job = mJobs.poll(); + } + try { + if (job.isComplete()) { + process(job); + continue; + } + } catch (IOException e) { + if (e.getCause() instanceof ClosedByInterruptException) { + // Job doesn't throw InterruptedException, but RPC socket layer + // is blocking and may throw a wrapped Exception if this thread + // is interrupted. Since the lower level cleared the flag, + // reset it here + Thread.currentThread().interrupt(); + } else { + LOG.warn("Lost job " + (null == job.getJobName() + ? "" : job.getJobName()), e); + continue; + } + } + synchronized (mJobs) { + if (!mJobs.offer(job)) { + LOG.error("Lost job " + (null == job.getJobName() + ? "" : job.getJobName())); // should never + // happen + } + } + break; + } + try { + TimeUnit.MILLISECONDS.sleep(pollDelayMillis); + } catch (InterruptedException e) { + shutdown = true; + continue; + } + } catch (Throwable e) { + LOG.warn("Unexpected exception: ", e); + } + } + } + } + + /** + * Start the internal, monitoring thread. + */ + public void start() { + mThread.start(); + } + + /** + * Wait for the monitor to halt, assuming shutdown or abort have been + * called. Note that, since submission may be sporatic, this will hang + * if no form of shutdown has been requested. + */ + public void join(long millis) throws InterruptedException { + mThread.join(millis); + } + + /** + * Drain all submitted jobs to a queue and stop the monitoring thread. + * Upstream submitter is assumed dead. + */ + public void abort() { + synchronized (mJobs) { + graceful = false; + shutdown = true; + } + mThread.interrupt(); + } + + /** + * When all monitored jobs have completed, stop the monitoring thread. + * Upstream submitter is assumed dead. + */ + public void shutdown() { + synchronized (mJobs) { + graceful = true; + shutdown = true; + } + mThread.interrupt(); + } +} + + Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=1077079&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Fri Mar 4 03:38:20 2011 @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Component accepting deserialized job traces, computing split data, and + * submitting to the cluster on deadline. Each job added from an upstream + * factory must be submitted to the cluster by the deadline recorded on it. + * Once submitted, jobs must be added to a downstream component for + * monitoring. + */ +class JobSubmitter implements Gridmix.Component { + + public static final Log LOG = LogFactory.getLog(JobSubmitter.class); + + final Semaphore sem; + private final FilePool inputDir; + private final JobMonitor monitor; + private final ExecutorService sched; + private volatile boolean shutdown = false; + + /** + * Initialize the submission component with downstream monitor and pool of + * files from which split data may be read. + * @param monitor Monitor component to which jobs should be passed + * @param threads Number of submission threads + * See {@link Gridmix#GRIDMIX_SUB_THR}. + * @param queueDepth Max depth of pending work queue + * See {@link Gridmix#GRIDMIX_QUE_DEP}. + * @param inputDir Set of files from which split data may be mined for + * synthetic jobs. + */ + public JobSubmitter(JobMonitor monitor, int threads, int queueDepth, + FilePool inputDir) { + sem = new Semaphore(queueDepth); + sched = new ThreadPoolExecutor(threads, threads, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + this.inputDir = inputDir; + this.monitor = monitor; + } + + /** + * Runnable wrapping a job to be submitted to the cluster. + */ + private class SubmitTask implements Runnable { + + final GridmixJob job; + public SubmitTask(GridmixJob job) { + this.job = job; + } + public void run() { + try { + // pre-compute split information + try { + job.buildSplits(inputDir); + } catch (IOException e) { + LOG.warn("Failed to submit " + job.getJob().getJobName(), e); + return; + } + // Sleep until deadline + long nsDelay = job.getDelay(TimeUnit.NANOSECONDS); + while (nsDelay > 0) { + TimeUnit.NANOSECONDS.sleep(nsDelay); + nsDelay = job.getDelay(TimeUnit.NANOSECONDS); + } + try { + // submit job + monitor.add(job.call()); + LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() + + " (" + job.getJob().getJobID() + ")"); + } catch (IOException e) { + LOG.warn("Failed to submit " + job.getJob().getJobName(), e); + if (e.getCause() instanceof ClosedByInterruptException) { + throw new InterruptedException("Failed to submit " + + job.getJob().getJobName()); + } + } catch (ClassNotFoundException e) { + LOG.warn("Failed to submit " + job.getJob().getJobName(), e); + } + } catch (InterruptedException e) { + // abort execution, remove splits if nesc + // TODO release ThdLoc + GridmixJob.pullDescription(job.id()); + Thread.currentThread().interrupt(); + return; + } finally { + sem.release(); + } + } + } + + /** + * Enqueue the job to be submitted per the deadline associated with it. + */ + public void add(final GridmixJob job) throws InterruptedException { + final boolean addToQueue = !shutdown; + if (addToQueue) { + final SubmitTask task = new SubmitTask(job); + sem.acquire(); + try { + sched.execute(task); + } catch (RejectedExecutionException e) { + sem.release(); + } + } + } + + /** + * (Re)scan the set of input files from which splits are derived. + */ + public void refreshFilePool() throws IOException { + inputDir.refresh(); + } + + /** + * Does nothing, as the threadpool is already initialized and waiting for + * work from the upstream factory. + */ + public void start() { } + + /** + * Continue running until all queued jobs have been submitted to the + * cluster. + */ + public void join(long millis) throws InterruptedException { + if (!shutdown) { + throw new IllegalStateException("Cannot wait for active submit thread"); + } + sched.awaitTermination(millis, TimeUnit.MILLISECONDS); + } + + /** + * Finish all jobs pending submission, but do not accept new work. + */ + public void shutdown() { + // complete pending tasks, but accept no new tasks + shutdown = true; + sched.shutdown(); + } + + /** + * Discard pending work, including precomputed work waiting to be + * submitted. + */ + public void abort() { + //pendingJobs.clear(); + shutdown = true; + sched.shutdownNow(); + } +} Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java?rev=1077079&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java Fri Mar 4 03:38:20 2011 @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; + +/** + * For every record consumed, read key + val bytes from the stream provided. + */ +class ReadRecordFactory extends RecordFactory { + + /** + * Size of internal, scratch buffer to read from internal stream. + */ + public static final String GRIDMIX_READ_BUF_SIZE = "gridmix.read.buffer.size"; + + private final byte[] buf; + private final InputStream src; + private final RecordFactory factory; + + /** + * @param targetBytes Expected byte count. + * @param targetRecords Expected record count. + * @param src Stream to read bytes. + * @param conf Used to establish read buffer size. @see #GRIDMIX_READ_BUF_SIZE + */ + public ReadRecordFactory(long targetBytes, long targetRecords, + InputStream src, Configuration conf) { + this(new AvgRecordFactory(targetBytes, targetRecords, conf), src, conf); + } + + /** + * @param factory Factory to draw record sizes. + * @param src Stream to read bytes. + * @param conf Used to establish read buffer size. @see #GRIDMIX_READ_BUF_SIZE + */ + public ReadRecordFactory(RecordFactory factory, InputStream src, + Configuration conf) { + this.src = src; + this.factory = factory; + buf = new byte[conf.getInt(GRIDMIX_READ_BUF_SIZE, 64 * 1024)]; + } + + @Override + public boolean next(GridmixKey key, GridmixRecord val) throws IOException { + if (!factory.next(key, val)) { + return false; + } + for (int len = (null == key ? 0 : key.getSize()) + val.getSize(); + len > 0; len -= buf.length) { + IOUtils.readFully(src, buf, 0, Math.min(buf.length, len)); + } + return true; + } + + @Override + public float getProgress() throws IOException { + return factory.getProgress(); + } + + @Override + public void close() throws IOException { + IOUtils.cleanup(null, src); + factory.close(); + } +} Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java?rev=1077079&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java Fri Mar 4 03:38:20 2011 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Interface for producing records as inputs and outputs to tasks. + */ +abstract class RecordFactory implements Closeable { + + /** + * Transform the given record or perform some operation. + * @return true if the record should be emitted. + */ + public abstract boolean next(GridmixKey key, GridmixRecord val) + throws IOException; + + /** + * Estimate of exhausted record capacity. + */ + public abstract float getProgress() throws IOException; + +}