Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 61682 invoked from network); 11 Dec 2009 19:33:43 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 11 Dec 2009 19:33:43 -0000 Received: (qmail 3480 invoked by uid 500); 11 Dec 2009 19:33:43 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 3424 invoked by uid 500); 11 Dec 2009 19:33:43 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 3414 invoked by uid 99); 11 Dec 2009 19:33:42 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2009 19:33:42 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2009 19:33:31 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5DFAF23888FE; Fri, 11 Dec 2009 19:33:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r889778 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ src/tools/org/apache/hadoop/tools/rumen/ Date: Fri, 11 Dec 2009 19:33:02 -0000 To: mapreduce-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091211193308.5DFAF23888FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cdouglas Date: Fri Dec 11 19:32:58 2009 New Revision: 889778 URL: http://svn.apache.org/viewvc?rev=889778&view=rev Log: MAPREDUCE-1124. Fix imprecise byte counts in Gridmix. Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReadRecordFactory.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RecordFactory.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=889778&r1=889777&r2=889778&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec 11 19:32:58 2009 @@ -992,3 +992,4 @@ MAPREDUCE-879. Fix broken unit test TestTaskTrackerLocalization on MacOS. (Sreekanth Ramakrishnan via yhemanth) + MAPREDUCE-1124. Fix imprecise byte counts in Gridmix. (cdouglas) Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java?rev=889778&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java Fri Dec 11 19:32:58 2009 @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +/** + * Given byte and record targets, emit roughly equal-sized records satisfying + * the contract. + */ +class AvgRecordFactory extends RecordFactory { + + /** + * Percentage of record for key data. + */ + public static final String GRIDMIX_KEY_FRC = "gridmix.key.fraction"; + + + private final long targetBytes; + private final long targetRecords; + private final long step; + private final int avgrec; + private final int keyLen; + private long accBytes = 0L; + private long accRecords = 0L; + + /** + * @param targetBytes Expected byte count. + * @param targetRecords Expected record count. + * @param conf Used to resolve edge cases @see #GRIDMIX_KEY_FRC + */ + public AvgRecordFactory(long targetBytes, long targetRecords, + Configuration conf) { + this.targetBytes = targetBytes; + this.targetRecords = targetRecords <= 0 && this.targetBytes >= 0 + ? Math.max(1, + this.targetBytes / conf.getInt("gridmix.missing.rec.size", 64 * 1024)) + : targetRecords; + final long tmp = this.targetBytes / this.targetRecords; + step = this.targetBytes - this.targetRecords * tmp; + avgrec = (int) Math.min(Integer.MAX_VALUE, tmp + 1); + keyLen = Math.max(1, + (int)(tmp * Math.min(1.0f, conf.getFloat(GRIDMIX_KEY_FRC, 0.1f)))); + } + + @Override + public boolean next(GridmixKey key, GridmixRecord val) throws IOException { + if (accBytes >= targetBytes) { + return false; + } + final int reclen = accRecords++ >= step ? avgrec - 1 : avgrec; + final int len = (int) Math.min(targetBytes - accBytes, reclen); + // len != reclen? + if (key != null) { + key.setSize(keyLen); + val.setSize(len - key.getSize()); + } else { + val.setSize(len); + } + accBytes += len; + return true; + } + + @Override + public float getProgress() throws IOException { + return Math.min(1.0f, accBytes / ((float)targetBytes)); + } + + @Override + public void close() throws IOException { + // noop + } + +} Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java?rev=889778&r1=889777&r2=889778&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java Fri Dec 11 19:32:58 2009 @@ -223,7 +223,6 @@ return getSize(); } - // TODO sort, pick rand pairs of kth large/small in dir IndexMapper mapping; if ((curdir.size() < 200) || ((double) targetSize / getSize() > 0.5)) { mapping = new DenseIndexMapper(curdir.size()); @@ -234,13 +233,13 @@ ArrayList selected = new ArrayList(); long ret = 0L; int poolSize = curdir.size(); - while (ret < targetSize) { + do { int pos = rand.nextInt(poolSize); int index = mapping.get(pos); selected.add(index); ret += curdir.get(index).getLen(); mapping.swap(pos, --poolSize); - } + } while (ret < targetSize); for (Integer i : selected) { files.add(curdir.get(i)); Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java?rev=889778&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java Fri Dec 11 19:32:58 2009 @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; + +/** + * Given a {@link org.apache.hadoop.mapreduce.lib.input.CombineFileSplit}, + * circularly read through each input source. + */ +class FileQueue extends InputStream { + + private int idx = -1; + private long curlen = -1L; + private FSDataInputStream input; + private final byte[] z = new byte[1]; + private final Path[] paths; + private final long[] lengths; + private final long[] startoffset; + private final Configuration conf; + + /** + * @param split Description of input sources. + * @param conf Used to resolve FileSystem instances. + */ + public FileQueue(CombineFileSplit split, Configuration conf) + throws IOException { + this.conf = conf; + paths = split.getPaths(); + startoffset = split.getStartOffsets(); + lengths = split.getLengths(); + nextSource(); + } + + protected void nextSource() throws IOException { + if (0 == paths.length) { + return; + } + if (input != null) { + input.close(); + } + idx = (idx + 1) % paths.length; + curlen = lengths[idx]; + final Path file = paths[idx]; + final FileSystem fs = file.getFileSystem(conf); + input = fs.open(file); + input.seek(startoffset[idx]); + } + + @Override + public int read() throws IOException { + final int tmp = read(z); + return tmp == -1 ? -1 : (0xFF & z[0]); + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int kvread = 0; + while (kvread < len) { + if (curlen <= 0) { + nextSource(); + continue; + } + final int srcRead = (int) Math.min(len - kvread, curlen); + IOUtils.readFully(input, b, kvread, srcRead); + curlen -= srcRead; + kvread += srcRead; + } + return kvread; + } + + @Override + public void close() throws IOException { + input.close(); + } + +} Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=889778&r1=889777&r2=889778&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Fri Dec 11 19:32:58 2009 @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.DataInput; import java.io.DataOutput; +import java.io.OutputStream; import java.util.Arrays; import java.util.ArrayList; import java.util.List; @@ -28,7 +29,6 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; @@ -52,10 +52,30 @@ // TODO can replace with form of GridmixJob class GenerateData extends GridmixJob { + /** + * Total bytes to write. + */ + public static final String GRIDMIX_GEN_BYTES = "gridmix.gen.bytes"; + + /** + * Maximum size per file written. + */ + public static final String GRIDMIX_GEN_CHUNK = "gridmix.gen.bytes.per.file"; + + /** + * Size of writes to output file. + */ + public static final String GRIDMIX_VAL_BYTES = "gendata.val.bytes"; + + /** + * Status reporting interval, in megabytes. + */ + public static final String GRIDMIX_GEN_INTERVAL = "gendata.interval.mb"; + public GenerateData(Configuration conf, Path outdir, long genbytes) throws IOException { super(conf, 0L, "GRIDMIX_GENDATA"); - job.getConfiguration().setLong("gridmix.gendata.bytes", genbytes); + job.getConfiguration().setLong(GRIDMIX_GEN_BYTES, genbytes); FileOutputFormat.setOutputPath(job, outdir); } @@ -84,7 +104,7 @@ protected void setup(Context context) throws IOException, InterruptedException { val = new BytesWritable(new byte[ - context.getConfiguration().getInt("gendata.val.bytes", 1024 * 1024)]); + context.getConfiguration().getInt(GRIDMIX_VAL_BYTES, 1024 * 1024)]); } @Override @@ -106,7 +126,7 @@ final JobClient client = new JobClient(jobCtxt.getConfiguration()); ClusterStatus stat = client.getClusterStatus(true); final long toGen = - jobCtxt.getConfiguration().getLong("gridmix.gendata.bytes", -1); + jobCtxt.getConfiguration().getLong(GRIDMIX_GEN_BYTES, -1); if (toGen < 0) { throw new IOException("Invalid/missing generation bytes: " + toGen); } @@ -144,7 +164,7 @@ throws IOException, InterruptedException { toWrite = split.getLength(); RINTERVAL = ctxt.getConfiguration().getInt( - "gendata.report.interval.mb", 10) << 20; + GRIDMIX_GEN_INTERVAL, 10) << 20; } @Override public boolean nextKeyValue() throws IOException { @@ -219,20 +239,52 @@ public RecordWriter getRecordWriter( TaskAttemptContext job) throws IOException { - Path file = getDefaultWorkFile(job, ""); - FileSystem fs = file.getFileSystem(job.getConfiguration()); - final FSDataOutputStream fileOut = fs.create(file, false); - return new RecordWriter() { - @Override - public void write(NullWritable key, BytesWritable value) - throws IOException { - fileOut.write(value.getBytes(), 0, value.getLength()); - } - @Override - public void close(TaskAttemptContext ctxt) throws IOException { + return new ChunkWriter(getDefaultWorkFile(job, ""), + job.getConfiguration()); + } + + static class ChunkWriter extends RecordWriter { + private final Path outDir; + private final FileSystem fs; + private final long maxFileBytes; + + private long accFileBytes = 0L; + private long fileIdx = -1L; + private OutputStream fileOut = null; + + public ChunkWriter(Path outDir, Configuration conf) throws IOException { + this.outDir = outDir; + fs = outDir.getFileSystem(conf); + maxFileBytes = conf.getLong(GRIDMIX_GEN_CHUNK, 1L << 30); + nextDestination(); + } + private void nextDestination() throws IOException { + if (fileOut != null) { fileOut.close(); } - }; + fileOut = fs.create(new Path(outDir, "segment-" + (++fileIdx)), false); + accFileBytes = 0L; + } + @Override + public void write(NullWritable key, BytesWritable value) + throws IOException { + int written = 0; + final int total = value.getLength(); + while (written < total) { + final int write = (int) + Math.min(total - written, maxFileBytes - accFileBytes); + fileOut.write(value.getBytes(), written, write); + written += write; + accFileBytes += write; + if (accFileBytes >= maxFileBytes) { + nextDestination(); + } + } + } + @Override + public void close(TaskAttemptContext ctxt) throws IOException { + fileOut.close(); + } } } Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=889778&r1=889777&r2=889778&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Dec 11 19:32:58 2009 @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.Job; @@ -73,12 +72,11 @@ "gridmix.client.pending.queue.depth"; /** - * Size of key data in synthetic jobs. At present, key length is not - * available in job traces. Since all solutions are equally bad, globally - * specifying the amount of each record that is key data is the simplest - * to implement and the method chosen. + * Multiplier to accelerate or decelerate job submission. As a crude means of + * sizing a job trace to a cluster, the time separating two jobs is + * multiplied by this factor. */ - public static final String GRIDMIX_KEY_LEN = "gridmix.min.key.length"; + public static final String GRIDMIX_SUB_MUL = "gridmix.submit.multiplier"; // Submit data structures private JobFactory factory; @@ -135,7 +133,7 @@ submitter = createJobSubmitter(monitor, conf.getInt(GRIDMIX_SUB_THR, Runtime.getRuntime().availableProcessors() + 1), - conf.getInt(GRIDMIX_QUE_DEP, 100), + conf.getInt(GRIDMIX_QUE_DEP, 5), new FilePool(conf, ioPath)); factory = createJobFactory(submitter, traceIn, scratchDir, conf, startFlag); monitor.start(); @@ -182,12 +180,10 @@ printUsage(System.err); return 1; } - FileSystem fs = null; InputStream trace = null; try { final Configuration conf = getConf(); Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix")); - fs = scratchDir.getFileSystem(conf); // add shutdown hook for SIGINT, etc. Runtime.getRuntime().addShutdownHook(sdh); CountDownLatch startFlag = new CountDownLatch(1); @@ -210,7 +206,7 @@ if (factory != null) { // wait for input exhaustion - factory.join(); + factory.join(Long.MAX_VALUE); final Throwable badTraceException = factory.error(); if (null != badTraceException) { LOG.error("Error in trace", badTraceException); @@ -218,10 +214,10 @@ } // wait for pending tasks to be submitted submitter.shutdown(); - submitter.join(); + submitter.join(Long.MAX_VALUE); // wait for running tasks to complete monitor.shutdown(); - monitor.join(); + monitor.join(Long.MAX_VALUE); } } finally { IOUtils.cleanup(LOG, trace); @@ -236,13 +232,17 @@ */ class Shutdown extends Thread { - private void killComponent(Component component) { + static final long FAC_SLEEP = 1000; + static final long SUB_SLEEP = 4000; + static final long MON_SLEEP = 15000; + + private void killComponent(Component component, long maxwait) { if (component == null) { return; } - component.abort(); // read no more tasks + component.abort(); try { - component.join(); + component.join(maxwait); } catch (InterruptedException e) { LOG.warn("Interrupted waiting for " + component); } @@ -253,9 +253,9 @@ public void run() { LOG.info("Exiting..."); try { - killComponent(factory); // read no more tasks - killComponent(submitter); // submit no more tasks - killComponent(monitor); // process remaining jobs in this thread + killComponent(factory, FAC_SLEEP); // read no more tasks + killComponent(submitter, SUB_SLEEP); // submit no more tasks + killComponent(monitor, MON_SLEEP); // process remaining jobs here } finally { if (monitor == null) { return; @@ -306,7 +306,8 @@ out.printf(" %-40s : Output directory\n", GRIDMIX_OUT_DIR); out.printf(" %-40s : Submitting threads\n", GRIDMIX_SUB_THR); out.printf(" %-40s : Queued job desc\n", GRIDMIX_QUE_DEP); - out.printf(" %-40s : Key size\n", GRIDMIX_KEY_LEN); + out.printf(" %-40s : Key fraction of rec\n", + AvgRecordFactory.GRIDMIX_KEY_FRC); } /** @@ -331,7 +332,7 @@ * Wait until the service completes. It is assumed that either a * {@link #shutdown} or {@link #abort} has been requested. */ - void join() throws InterruptedException; + void join(long millis) throws InterruptedException; /** * Shut down gracefully, finishing all pending work. Reject new requests. Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=889778&r1=889777&r2=889778&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Fri Dec 11 19:32:58 2009 @@ -17,22 +17,10 @@ */ package org.apache.hadoop.mapred.gridmix; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.nio.IntBuffer; -import java.nio.LongBuffer; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.EnumSet; import java.util.Formatter; -import java.util.HashMap; import java.util.List; -import java.util.Map.Entry; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -40,15 +28,10 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Writable; @@ -65,7 +48,6 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.tools.rumen.JobStory; @@ -80,6 +62,7 @@ class GridmixJob implements Callable, Delayed { public static final String JOBNAME = "GRIDMIX"; + public static final String ORIGNAME = "gridmix.job.name.original"; public static final Log LOG = LogFactory.getLog(GridmixJob.class); private static final ThreadLocal nameFormat = @@ -180,14 +163,17 @@ job.setReducerClass(GridmixReducer.class); job.setNumReduceTasks(jobdesc.getNumberReduces()); job.setMapOutputKeyClass(GridmixKey.class); - job.setMapOutputValueClass(BytesWritable.class); - job.setSortComparatorClass(BytesWritable.Comparator.class); + job.setMapOutputValueClass(GridmixRecord.class); + job.setSortComparatorClass(GridmixKey.Comparator.class); job.setGroupingComparatorClass(SpecGroupingComparator.class); job.setInputFormatClass(GridmixInputFormat.class); job.setOutputFormatClass(RawBytesOutputFormat.class); job.setPartitionerClass(DraftPartitioner.class); job.setJarByClass(GridmixJob.class); job.getConfiguration().setInt("gridmix.job.seq", seq); + job.getConfiguration().set(ORIGNAME, null == jobdesc.getJobID() + ? "" : jobdesc.getJobID().toString()); + job.getConfiguration().setBoolean(Job.USED_GENERIC_PARSER, true); FileInputFormat.addInputPath(job, new Path("ignored")); FileOutputFormat.setOutputPath(job, outdir); job.submit(); @@ -200,11 +186,10 @@ } } - /** - * Group REDUCE_SPEC records together - */ public static class SpecGroupingComparator - implements RawComparator, Serializable { + implements RawComparator { + private final DataInputBuffer di = new DataInputBuffer(); + private final byte[] reset = di.getData(); @Override public int compare(GridmixKey g1, GridmixKey g2) { final byte t1 = g1.getType(); @@ -215,284 +200,128 @@ } assert t1 == GridmixKey.DATA; assert t2 == GridmixKey.DATA; - return WritableComparator.compareBytes( - g1.getBytes(), 0, g1.getLength(), - g2.getBytes(), 0, g2.getLength()); + return g1.compareTo(g2); } @Override - public int compare(byte[] b1, int s1, int l1, - byte[] b2, int s2, int l2) { - final byte t1 = b1[s1 + 4]; - final byte t2 = b2[s2 + 4]; - if (t1 == GridmixKey.REDUCE_SPEC || - t2 == GridmixKey.REDUCE_SPEC) { - return t1 - t2; - } - assert t1 == GridmixKey.DATA; - assert t2 == GridmixKey.DATA; - return WritableComparator.compareBytes( - b1, s1 + 4, l1 - 4, - b2, s2 + 4, l2 - 4); - } - } - - /** - * Keytype for synthetic jobs, some embedding instructions for the reduce. - */ - public static class GridmixKey extends BytesWritable { - // long fields specifying reduce contract - private enum RSpec { REC_IN, REC_OUT, BYTES_OUT }; - private static final int SPEC_START = 5; // type + partition len - private static final int NUMFIELDS = RSpec.values().length; - private static final int SPEC_SIZE = NUMFIELDS * 8; - - // Key types - static final byte REDUCE_SPEC = 0; - static final byte DATA = 1; - - private IntBuffer partition; - private LongBuffer spec; - - public GridmixKey() { - super(new byte[SPEC_START]); - } - - public GridmixKey(byte type, byte[] b) { - super(b); - setType(type); - } - - public byte getType() { - return getBytes()[0]; - } - public void setPartition(int partition) { - this.partition.put(0, partition); - } - public int getPartition() { - return partition.get(0); - } - public long getReduceInputRecords() { - checkState(REDUCE_SPEC); - return spec.get(RSpec.REC_IN.ordinal()); - } - public long getReduceOutputBytes() { - checkState(REDUCE_SPEC); - return spec.get(RSpec.BYTES_OUT.ordinal()); - } - public long getReduceOutputRecords() { - checkState(REDUCE_SPEC); - return spec.get(RSpec.REC_OUT.ordinal()); - } - public void setType(byte b) { - switch (b) { - case REDUCE_SPEC: - if (getCapacity() < SPEC_START + SPEC_SIZE) { - setSize(SPEC_START + SPEC_SIZE); - } - spec = - ByteBuffer.wrap(getBytes(), SPEC_START, SPEC_SIZE).asLongBuffer(); - break; - case DATA: - if (getCapacity() < SPEC_START) { - setSize(SPEC_START); - } - spec = null; - break; - default: - throw new IllegalArgumentException("Illegal type " + b); - } - getBytes()[0] = b; - partition = - ByteBuffer.wrap(getBytes(), 1, SPEC_START - 1).asIntBuffer(); - } - public void setReduceInputRecords(long records) { - checkState(REDUCE_SPEC); - spec.put(RSpec.REC_IN.ordinal(), records); - } - public void setReduceOutputBytes(long bytes) { - checkState(REDUCE_SPEC); - spec.put(RSpec.BYTES_OUT.ordinal(), bytes); - } - public void setReduceOutputRecords(long records) { - checkState(REDUCE_SPEC); - spec.put(RSpec.REC_OUT.ordinal(), records); - } - private void checkState(byte b) { - if (getLength() < SPEC_START || getType() != b) { - throw new IllegalStateException("Expected " + b + ", was " + getType()); - } - } - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - if (getLength() < SPEC_START) { - throw new IOException("Invalid GridmixKey, len " + getLength()); - } - partition = - ByteBuffer.wrap(getBytes(), 1, SPEC_START - 1).asIntBuffer(); - spec = getType() == REDUCE_SPEC - ? ByteBuffer.wrap(getBytes(), SPEC_START, SPEC_SIZE).asLongBuffer() - : null; - } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - if (getType() == REDUCE_SPEC) { - LOG.debug("SPEC(" + getPartition() + ") " + getReduceInputRecords() + - " -> " + getReduceOutputRecords() + "/" + getReduceOutputBytes()); - } - } - @Override - public boolean equals(Object other) { - if (other instanceof GridmixKey) { - return super.equals(other); + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + try { + final int ret; + di.reset(b1, s1, l1); + final int x1 = WritableUtils.readVInt(di); + di.reset(b2, s2, l2); + final int x2 = WritableUtils.readVInt(di); + final int t1 = b1[s1 + x1]; + final int t2 = b2[s2 + x2]; + if (t1 == GridmixKey.REDUCE_SPEC || + t2 == GridmixKey.REDUCE_SPEC) { + ret = t1 - t2; + } else { + assert t1 == GridmixKey.DATA; + assert t2 == GridmixKey.DATA; + ret = + WritableComparator.compareBytes(b1, s1, x1, b2, s2, x2); + } + di.reset(reset, 0, 0); + return ret; + } catch (IOException e) { + throw new RuntimeException(e); } - return false; - } - - @Override - public int hashCode() { - return super.hashCode(); } } public static class GridmixMapper - extends Mapper { - - private final Random r = new Random(); - private GridmixKey key; - private final BytesWritable val = new BytesWritable(); + extends Mapper { - private int keyLen; private double acc; private double ratio; - private int[] reduceRecordSize; - private long[] reduceRecordCount; - private long[] reduceRecordRemaining; + private final ArrayList reduces = + new ArrayList(); + private final Random r = new Random(); + + private final GridmixKey key = new GridmixKey(); + private final GridmixRecord val = new GridmixRecord(); @Override - protected void setup(Context context) + protected void setup(Context ctxt) throws IOException, InterruptedException { - // TODO clearly job-specific, but no data at present - keyLen = context.getConfiguration().getInt(Gridmix.GRIDMIX_KEY_LEN, 20); - key = new GridmixKey(GridmixKey.DATA, new byte[keyLen]); - final GridmixSplit split = (GridmixSplit) context.getInputSplit(); - LOG.info("ID: " + split.getId()); - reduceRecordCount = split.getOutputRecords(); - reduceRecordRemaining = - Arrays.copyOf(reduceRecordCount, reduceRecordCount.length); - reduceRecordSize = new int[reduceRecordCount.length]; - int valsize = -1; + final Configuration conf = ctxt.getConfiguration(); + final GridmixSplit split = (GridmixSplit) ctxt.getInputSplit(); + final int maps = split.getMapCount(); final long[] reduceBytes = split.getOutputBytes(); + final long[] reduceRecords = split.getOutputRecords(); + long totalRecords = 0L; - for (int i = 0; i < reduceBytes.length; ++i) { - reduceRecordSize[i] = Math.max(0, - Math.round(reduceBytes[i] / (1.0f * reduceRecordCount[i])) - keyLen); - valsize = Math.max(reduceRecordSize[i], valsize); - totalRecords += reduceRecordCount[i]; - } - valsize = Math.max(0, valsize - 4); // BW len encoding - val.setCapacity(valsize); - val.setSize(valsize); - ratio = totalRecords / (1.0 * split.getInputRecords()); + final int nReduces = ctxt.getNumReduceTasks(); + if (nReduces > 0) { + int idx = 0; + int id = split.getId(); + for (int i = 0; i < nReduces; ++i) { + final GridmixKey.Spec spec = new GridmixKey.Spec(); + if (i == id) { + spec.bytes_out = split.getReduceBytes(idx); + spec.rec_out = split.getReduceRecords(idx); + ++idx; + id += maps; + } + reduces.add(new IntermediateRecordFactory( + new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf), + i, reduceRecords[i], spec, conf)); + totalRecords += reduceRecords[i]; + } + } else { + reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0], + conf)); + totalRecords = reduceRecords[0]; + } + final long splitRecords = split.getInputRecords(); + final long inputRecords = splitRecords <= 0 && split.getLength() >= 0 + ? Math.max(1, + split.getLength() / conf.getInt("gridmix.missing.rec.size", 64*1024)) + : splitRecords; + ratio = totalRecords / (1.0 * inputRecords); acc = 0.0; } - protected void fillBytes(BytesWritable val, int len) { - r.nextBytes(val.getBytes()); - val.setSize(len); - } - - /** Find next non-empty partition after start. */ - private int getNextPart(final int start) { - int p = start; - do { - p = (p + 1) % reduceRecordSize.length; - } while (0 == reduceRecordRemaining[p] && p != start); - return 0 == reduceRecordRemaining[p] ? -1 : p; - } - @Override - public void map(IntWritable ignored, BytesWritable bytes, + public void map(NullWritable ignored, GridmixRecord rec, Context context) throws IOException, InterruptedException { - int p = getNextPart(r.nextInt(reduceRecordSize.length)); - if (-1 == p) { - return; - } acc += ratio; - while (acc >= 1.0) { - fillBytes(key, key.getLength()); - key.setType(GridmixKey.DATA); - key.setPartition(p); - --reduceRecordRemaining[p]; - fillBytes(val, reduceRecordSize[p]); + while (acc >= 1.0 && !reduces.isEmpty()) { + key.setSeed(r.nextLong()); + val.setSeed(r.nextLong()); + final int idx = r.nextInt(reduces.size()); + final RecordFactory f = reduces.get(idx); + if (!f.next(key, val)) { + reduces.remove(idx); + continue; + } context.write(key, val); acc -= 1.0; - if (0 == reduceRecordRemaining[p] && -1 == (p = getNextPart(p))) { - return; - } } } @Override public void cleanup(Context context) throws IOException, InterruptedException { - // output any remaining records - // TODO include reduce spec in remaining records if avail - // (i.e. move this to map) - for (int i = 0; i < reduceRecordSize.length; ++i) { - for (long j = reduceRecordRemaining[i]; j > 0; --j) { - fillBytes(key, key.getLength()); - key.setType(GridmixKey.DATA); - key.setPartition(i); - fillBytes(val, reduceRecordSize[i]); + for (RecordFactory factory : reduces) { + key.setSeed(r.nextLong()); + while (factory.next(key, val)) { context.write(key, val); + key.setSeed(r.nextLong()); } } - val.setSize(0); - key.setType(GridmixKey.REDUCE_SPEC); - final int reduces = context.getNumReduceTasks(); - final GridmixSplit split = (GridmixSplit) context.getInputSplit(); - final int maps = split.getMapCount(); - int idx = 0; - int id = split.getId(); - for (int i = 0; i < reduces; ++i) { - key.setPartition(i); - key.setReduceInputRecords(reduceRecordCount[i]); - // Write spec for all red st r_id % id == 0 - if (i == id) { - key.setReduceOutputBytes(split.getReduceBytes(idx)); - key.setReduceOutputRecords(split.getReduceRecords(idx)); - LOG.debug(String.format("SPEC'D %d / %d to %d", - split.getReduceRecords(idx), split.getReduceBytes(idx), i)); - ++idx; - id += maps; - } else { - key.setReduceOutputBytes(0); - key.setReduceOutputRecords(0); - } - context.write(key, val); - } } } public static class GridmixReducer - extends Reducer { + extends Reducer { private final Random r = new Random(); - private final BytesWritable val = new BytesWritable(); + private final GridmixRecord val = new GridmixRecord(); private double acc; private double ratio; - private long written; - private long inRecords = 0L; - private long outBytes = 0L; - private long outRecords = 0L; - - protected void fillBytes(BytesWritable val, int len) { - r.nextBytes(val.getBytes()); - val.setSize(len); - } + private RecordFactory factory; @Override protected void setup(Context context) @@ -501,62 +330,52 @@ context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) { throw new IOException("Missing reduce spec"); } - for (BytesWritable ignored : context.getValues()) { + long outBytes = 0L; + long outRecords = 0L; + long inRecords = 0L; + for (GridmixRecord ignored : context.getValues()) { final GridmixKey spec = context.getCurrentKey(); inRecords += spec.getReduceInputRecords(); - LOG.debug("GOT COUNT " + spec.getReduceInputRecords()); outBytes += spec.getReduceOutputBytes(); outRecords += spec.getReduceOutputRecords(); } - LOG.debug("GOT SPEC " + outRecords + "/" + outBytes); - val.setCapacity(Math.round(outBytes / (1.0f * outRecords))); + if (0 == outRecords && inRecords > 0) { + LOG.info("Spec output bytes w/o records. Using input record count"); + outRecords = inRecords; + } + factory = + new AvgRecordFactory(outBytes, outRecords, context.getConfiguration()); ratio = outRecords / (1.0 * inRecords); acc = 0.0; - LOG.debug(String.format("RECV %d -> %10d/%10d %d %f", inRecords, - outRecords, outBytes, val.getCapacity(), ratio)); } @Override - protected void reduce(GridmixKey key, Iterable values, + protected void reduce(GridmixKey key, Iterable values, Context context) throws IOException, InterruptedException { - for (BytesWritable ignored : values) { + for (GridmixRecord ignored : values) { acc += ratio; - while (acc >= 1.0 && written < outBytes) { - final int len = (int) Math.min(outBytes - written, val.getCapacity()); - fillBytes(val, len); + while (acc >= 1.0 && factory.next(null, val)) { context.write(NullWritable.get(), val); acc -= 1.0; - written += len; - LOG.debug(String.format("%f %d/%d", acc, written, outBytes)); } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { - while (written < outBytes) { - final int len = (int) Math.min(outBytes - written, val.getCapacity()); - fillBytes(val, len); + val.setSeed(r.nextLong()); + while (factory.next(null, val)) { context.write(NullWritable.get(), val); - written += len; + val.setSeed(r.nextLong()); } } } static class GridmixRecordReader - extends RecordReader { + extends RecordReader { - private long bytesRead = 0; - private long bytesTotal; - private Configuration conf; - private final IntWritable key = new IntWritable(); - private final BytesWritable inBytes = new BytesWritable(); - - private FSDataInputStream input; - private int idx = -1; - private int capacity; - private Path[] paths; - private long[] startoffset; - private long[] lengths; + private RecordFactory factory; + private final Random r = new Random(); + private final GridmixRecord val = new GridmixRecord(); public GridmixRecordReader() { } @@ -564,178 +383,36 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt) throws IOException, InterruptedException { final GridmixSplit split = (GridmixSplit)genericSplit; - this.conf = ctxt.getConfiguration(); - paths = split.getPaths(); - startoffset = split.getStartOffsets(); - lengths = split.getLengths(); - bytesTotal = split.getLength(); - capacity = (int) Math.round(bytesTotal / (1.0 * split.getInputRecords())); - inBytes.setCapacity(capacity); - nextSource(); - } - private void nextSource() throws IOException { - idx = (idx + 1) % paths.length; - final Path file = paths[idx]; - final FileSystem fs = file.getFileSystem(conf); - input = fs.open(file, capacity); - input.seek(startoffset[idx]); + final Configuration conf = ctxt.getConfiguration(); + factory = new ReadRecordFactory(split.getLength(), + split.getInputRecords(), new FileQueue(split, conf), conf); } + @Override public boolean nextKeyValue() throws IOException { - if (bytesRead >= bytesTotal) { - return false; - } - final int len = (int) - Math.min(bytesTotal - bytesRead, inBytes.getCapacity()); - int kvread = 0; - while (kvread < len) { - assert lengths[idx] >= 0; - if (lengths[idx] <= 0) { - nextSource(); - continue; - } - final int srcRead = (int) Math.min(len - kvread, lengths[idx]); - IOUtils.readFully(input, inBytes.getBytes(), kvread, srcRead); - //LOG.trace("Read " + srcRead + " bytes from " + paths[idx]); - lengths[idx] -= srcRead; - kvread += srcRead; - } - bytesRead += kvread; - return true; + val.setSeed(r.nextLong()); + return factory.next(null, val); } @Override public float getProgress() throws IOException { - return bytesRead / ((float)bytesTotal); + return factory.getProgress(); } @Override - public IntWritable getCurrentKey() { return key; } + public NullWritable getCurrentKey() { + return NullWritable.get(); + } @Override - public BytesWritable getCurrentValue() { return inBytes; } + public GridmixRecord getCurrentValue() { + return val; + } @Override public void close() throws IOException { - IOUtils.cleanup(null, input); - } - } - - static class GridmixSplit extends CombineFileSplit { - private int id; - private int nSpec; - private int maps; - private int reduces; - private long inputRecords; - private long outputBytes; - private long outputRecords; - private long maxMemory; - private double[] reduceBytes = new double[0]; - private double[] reduceRecords = new double[0]; - - // Spec for reduces id mod this - private long[] reduceOutputBytes = new long[0]; - private long[] reduceOutputRecords = new long[0]; - - GridmixSplit() { - super(); - } - - public GridmixSplit(CombineFileSplit cfsplit, int maps, int id, - long inputBytes, long inputRecords, long outputBytes, - long outputRecords, double[] reduceBytes, double[] reduceRecords, - long[] reduceOutputBytes, long[] reduceOutputRecords) - throws IOException { - super(cfsplit); - this.id = id; - this.maps = maps; - reduces = reduceBytes.length; - this.inputRecords = inputRecords; - this.outputBytes = outputBytes; - this.outputRecords = outputRecords; - this.reduceBytes = Arrays.copyOf(reduceBytes, reduces); - this.reduceRecords = Arrays.copyOf(reduceRecords, reduces); - nSpec = reduceOutputBytes.length; - this.reduceOutputBytes = reduceOutputBytes; - this.reduceOutputRecords = reduceOutputRecords; - } - public int getId() { - return id; - } - public int getMapCount() { - return maps; - } - public long getInputRecords() { - return inputRecords; - } - public long[] getOutputBytes() { - final long[] ret = new long[reduces]; - for (int i = 0; i < reduces; ++i) { - ret[i] = Math.round(outputBytes * reduceBytes[i]); - } - return ret; - } - public long[] getOutputRecords() { - final long[] ret = new long[reduces]; - for (int i = 0; i < reduces; ++i) { - ret[i] = Math.round(outputRecords * reduceRecords[i]); - } - return ret; - } - public long getReduceBytes(int i) { - return reduceOutputBytes[i]; - } - public long getReduceRecords(int i) { - return reduceOutputRecords[i]; - } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - WritableUtils.writeVInt(out, id); - WritableUtils.writeVInt(out, maps); - WritableUtils.writeVLong(out, inputRecords); - WritableUtils.writeVLong(out, outputBytes); - WritableUtils.writeVLong(out, outputRecords); - WritableUtils.writeVLong(out, maxMemory); - WritableUtils.writeVInt(out, reduces); - for (int i = 0; i < reduces; ++i) { - out.writeDouble(reduceBytes[i]); - out.writeDouble(reduceRecords[i]); - } - WritableUtils.writeVInt(out, nSpec); - for (int i = 0; i < nSpec; ++i) { - out.writeLong(reduceOutputBytes[i]); - out.writeLong(reduceOutputRecords[i]); - } - } - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - id = WritableUtils.readVInt(in); - maps = WritableUtils.readVInt(in); - inputRecords = WritableUtils.readVLong(in); - outputBytes = WritableUtils.readVLong(in); - outputRecords = WritableUtils.readVLong(in); - maxMemory = WritableUtils.readVLong(in); - reduces = WritableUtils.readVInt(in); - if (reduceBytes.length < reduces) { - reduceBytes = new double[reduces]; - reduceRecords = new double[reduces]; - } - for (int i = 0; i < reduces; ++i) { - reduceBytes[i] = in.readDouble(); - reduceRecords[i] = in.readDouble(); - } - nSpec = WritableUtils.readVInt(in); - if (reduceOutputBytes.length < nSpec) { - reduceOutputBytes = new long[nSpec]; - reduceOutputRecords = new long[nSpec]; - } - for (int i = 0; i < nSpec; ++i) { - reduceOutputBytes[i] = in.readLong(); - reduceOutputRecords[i] = in.readLong(); - } + factory.close(); } } static class GridmixInputFormat - extends InputFormat { + extends InputFormat { @Override public List getSplits(JobContext jobCtxt) throws IOException { @@ -743,29 +420,28 @@ "gridmix.job.seq", -1)); } @Override - public RecordReader createRecordReader( + public RecordReader createRecordReader( InputSplit split, final TaskAttemptContext taskContext) throws IOException { return new GridmixRecordReader(); } } - static class RawBytesOutputFormat - extends FileOutputFormat { + static class RawBytesOutputFormat + extends FileOutputFormat { @Override - public RecordWriter getRecordWriter( + public RecordWriter getRecordWriter( TaskAttemptContext job) throws IOException { Path file = getDefaultWorkFile(job, ""); FileSystem fs = file.getFileSystem(job.getConfiguration()); final FSDataOutputStream fileOut = fs.create(file, false); - return new RecordWriter() { + return new RecordWriter() { @Override - public void write(NullWritable key, BytesWritable value) + public void write(K ignored, GridmixRecord value) throws IOException { - //LOG.trace("WROTE " + value.getLength() + " bytes"); - fileOut.write(value.getBytes(), 0, value.getLength()); + value.writeRandom(fileOut, value.getSize()); } @Override public void close(TaskAttemptContext ctxt) throws IOException { @@ -829,8 +505,10 @@ jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps); specBytes[j] = info.getOutputBytes(); specRecords[j] = info.getOutputRecords(); - LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i, - i + j * maps, info.getOutputRecords(), info.getOutputBytes())); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i, + i + j * maps, info.getOutputRecords(), info.getOutputBytes())); + } } final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i); splits.add(new GridmixSplit(striper.splitFor(inputDir, @@ -842,77 +520,4 @@ pushDescription(id(), splits); } - static class InputStriper { - int idx; - long currentStart; - FileStatus current; - final List files = new ArrayList(); - - InputStriper(FilePool inputDir, long mapBytes) - throws IOException { - final long inputBytes = inputDir.getInputFiles(mapBytes, files); - if (mapBytes > inputBytes) { - LOG.warn("Using " + inputBytes + "/" + mapBytes + " bytes"); - } - current = files.get(0); - } - - CombineFileSplit splitFor(FilePool inputDir, long bytes, int nLocs) - throws IOException { - final ArrayList paths = new ArrayList(); - final ArrayList start = new ArrayList(); - final ArrayList length = new ArrayList(); - final HashMap sb = new HashMap(); - while (bytes > 0) { - paths.add(current.getPath()); - start.add(currentStart); - final long fromFile = Math.min(bytes, current.getLen() - currentStart); - length.add(fromFile); - for (BlockLocation loc : - inputDir.locationsFor(current, currentStart, fromFile)) { - final double tedium = loc.getLength() / (1.0 * bytes); - for (String l : loc.getHosts()) { - Double j = sb.get(l); - if (null == j) { - sb.put(l, tedium); - } else { - sb.put(l, j.doubleValue() + tedium); - } - } - } - currentStart += fromFile; - bytes -= fromFile; - if (current.getLen() - currentStart == 0) { - current = files.get(++idx % files.size()); - currentStart = 0; - } - } - final ArrayList> sort = - new ArrayList>(sb.entrySet()); - Collections.sort(sort, hostRank); - final String[] hosts = new String[Math.min(nLocs, sort.size())]; - for (int i = 0; i < nLocs && i < sort.size(); ++i) { - hosts[i] = sort.get(i).getKey(); - } - return new CombineFileSplit(paths.toArray(new Path[0]), - toLongArray(start), toLongArray(length), hosts); - } - - private long[] toLongArray(final ArrayList sigh) { - final long[] ret = new long[sigh.size()]; - for (int i = 0; i < ret.length; ++i) { - ret[i] = sigh.get(i); - } - return ret; - } - - final Comparator> hostRank = - new Comparator>() { - public int compare(Entry a, Entry b) { - final double va = a.getValue(); - final double vb = b.getValue(); - return va > vb ? -1 : va < vb ? 1 : 0; - } - }; - } } Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java?rev=889778&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java Fri Dec 11 19:32:58 2009 @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.WritableComparator; + +class GridmixKey extends GridmixRecord { + static final byte REDUCE_SPEC = 0; + static final byte DATA = 1; + + static final int META_BYTES = 1; + + private byte type; + private int partition; // NOT serialized + private Spec spec = new Spec(); + + GridmixKey() { + this(DATA, 1, 0L); + } + GridmixKey(byte type, int size, long seed) { + super(size, seed); + this.type = type; + // setting type may change pcnt random bytes + setSize(size); + } + + @Override + public int getSize() { + switch (type) { + case REDUCE_SPEC: + return super.getSize() + spec.getSize() + META_BYTES; + case DATA: + return super.getSize() + META_BYTES; + default: + throw new IllegalStateException("Invalid type: " + type); + } + } + + @Override + public void setSize(int size) { + switch (type) { + case REDUCE_SPEC: + super.setSize(size - (META_BYTES + spec.getSize())); + break; + case DATA: + super.setSize(size - META_BYTES); + break; + default: + throw new IllegalStateException("Invalid type: " + type); + } + } + + /** + * Partition is not serialized. + */ + public int getPartition() { + return partition; + } + public void setPartition(int partition) { + this.partition = partition; + } + + public long getReduceInputRecords() { + assert REDUCE_SPEC == getType(); + return spec.rec_in; + } + public void setReduceInputRecords(long rec_in) { + assert REDUCE_SPEC == getType(); + final int origSize = getSize(); + spec.rec_in = rec_in; + setSize(origSize); + } + + public long getReduceOutputRecords() { + assert REDUCE_SPEC == getType(); + return spec.rec_out; + } + public void setReduceOutputRecords(long rec_out) { + assert REDUCE_SPEC == getType(); + final int origSize = getSize(); + spec.rec_out = rec_out; + setSize(origSize); + } + + public long getReduceOutputBytes() { + assert REDUCE_SPEC == getType(); + return spec.bytes_out; + }; + public void setReduceOutputBytes(long b_out) { + assert REDUCE_SPEC == getType(); + final int origSize = getSize(); + spec.bytes_out = b_out; + setSize(origSize); + } + + public byte getType() { + return type; + } + public void setType(byte type) throws IOException { + final int origSize = getSize(); + switch (type) { + case REDUCE_SPEC: + case DATA: + this.type = type; + break; + default: + throw new IOException("Invalid type: " + type); + } + setSize(origSize); + } + + public void setSpec(Spec spec) { + assert REDUCE_SPEC == getType(); + final int origSize = getSize(); + this.spec.set(spec); + setSize(origSize); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + setType(in.readByte()); + if (REDUCE_SPEC == getType()) { + spec.readFields(in); + } + } + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + final byte t = getType(); + out.writeByte(t); + if (REDUCE_SPEC == t) { + spec.write(out); + } + } + int fixedBytes() { + return super.fixedBytes() + + (REDUCE_SPEC == getType() ? spec.getSize() : 0) + META_BYTES; + } + @Override + public int compareTo(GridmixRecord other) { + final GridmixKey o = (GridmixKey) other; + final byte t1 = getType(); + final byte t2 = o.getType(); + if (t1 != t2) { + return t1 - t2; + } + return super.compareTo(other); + } + + /** + * Note that while the spec is not explicitly included, changing the spec + * may change its size, which will affect equality. + */ + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other != null && other.getClass() == getClass()) { + final GridmixKey o = ((GridmixKey)other); + return getType() == o.getType() && super.equals(o); + } + return false; + } + + @Override + public int hashCode() { + return super.hashCode() ^ getType(); + } + + public static class Spec implements Writable { + long rec_in; + long rec_out; + long bytes_out; + public Spec() { } + + public void set(Spec other) { + rec_in = other.rec_in; + bytes_out = other.bytes_out; + rec_out = other.rec_out; + } + + public int getSize() { + return WritableUtils.getVIntSize(rec_in) + + WritableUtils.getVIntSize(rec_out) + + WritableUtils.getVIntSize(bytes_out); + } + + @Override + public void readFields(DataInput in) throws IOException { + rec_in = WritableUtils.readVLong(in); + rec_out = WritableUtils.readVLong(in); + bytes_out = WritableUtils.readVLong(in); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeVLong(out, rec_in); + WritableUtils.writeVLong(out, rec_out); + WritableUtils.writeVLong(out, bytes_out); + } + } + + public static class Comparator extends GridmixRecord.Comparator { + + private final DataInputBuffer di = new DataInputBuffer(); + private final byte[] reset = di.getData(); + + public Comparator() { + super(GridmixKey.class); + } + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + try { + di.reset(b1, s1, l1); + final int x1 = WritableUtils.readVInt(di); + di.reset(b2, s2, l2); + final int x2 = WritableUtils.readVInt(di); + final int ret = (b1[s1 + x1] != b2[s2 + x2]) + ? b1[s1 + x1] - b2[s2 + x2] + : super.compare(b1, s1, x1, b2, s2, x2); + di.reset(reset, 0, 0); + return ret; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + static { + WritableComparator.define(GridmixKey.class, new Comparator()); + } + } +} + Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java?rev=889778&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java Fri Dec 11 19:32:58 2009 @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; + +class GridmixRecord implements WritableComparable { + + private static final int FIXED_BYTES = 1; + private int size = -1; + private long seed; + private final DataInputBuffer dib = + new DataInputBuffer(); + private final DataOutputBuffer dob = + new DataOutputBuffer(Long.SIZE / Byte.SIZE); + private byte[] literal = dob.getData(); + + GridmixRecord() { + this(1, 0L); + } + + GridmixRecord(int size, long seed) { + this.seed = seed; + setSizeInternal(size); + } + + public int getSize() { + return size; + } + + public void setSize(int size) { + setSizeInternal(size); + } + + private void setSizeInternal(int size) { + this.size = Math.max(1, size); + try { + seed = maskSeed(seed, this.size); + dob.reset(); + dob.writeLong(seed); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public final void setSeed(long seed) { + this.seed = seed; + } + + /** Marsaglia, 2003. */ + long nextRand(long x) { + x ^= (x << 13); + x ^= (x >>> 7); + return (x ^= (x << 17)); + } + + public void writeRandom(DataOutput out, final int size) throws IOException { + long tmp = seed; + out.writeLong(tmp); + int i = size - (Long.SIZE / Byte.SIZE); + while (i > Long.SIZE / Byte.SIZE - 1) { + tmp = nextRand(tmp); + out.writeLong(tmp); + i -= Long.SIZE / Byte.SIZE; + } + for (tmp = nextRand(tmp); i > 0; --i) { + out.writeByte((int)(tmp & 0xFF)); + tmp >>>= Byte.SIZE; + } + } + + @Override + public void readFields(DataInput in) throws IOException { + size = WritableUtils.readVInt(in); + int payload = size - WritableUtils.getVIntSize(size); + if (payload > Long.SIZE / Byte.SIZE) { + seed = in.readLong(); + payload -= Long.SIZE / Byte.SIZE; + } else { + Arrays.fill(literal, (byte)0); + in.readFully(literal, 0, payload); + dib.reset(literal, 0, literal.length); + seed = dib.readLong(); + payload = 0; + } + final int vBytes = in.skipBytes(payload); + if (vBytes != payload) { + throw new EOFException("Expected " + payload + ", read " + vBytes); + } + } + + @Override + public void write(DataOutput out) throws IOException { + // data bytes including vint encoding + WritableUtils.writeVInt(out, size); + final int payload = size - WritableUtils.getVIntSize(size); + if (payload > Long.SIZE / Byte.SIZE) { + writeRandom(out, payload); + } else if (payload > 0) { + out.write(literal, 0, payload); + } + } + + @Override + public int compareTo(GridmixRecord other) { + return compareSeed(other.seed, + Math.max(0, other.getSize() - other.fixedBytes())); + } + + int fixedBytes() { + // min vint size + return FIXED_BYTES; + } + + private static long maskSeed(long sd, int sz) { + // Don't use fixedBytes here; subclasses will set intended random len + if (sz <= FIXED_BYTES) { + sd = 0L; + } else if (sz < Long.SIZE / Byte.SIZE + FIXED_BYTES) { + final int tmp = sz - FIXED_BYTES; + final long mask = (1L << (Byte.SIZE * tmp)) - 1; + sd &= mask << (Byte.SIZE * (Long.SIZE / Byte.SIZE - tmp)); + } + return sd; + } + + int compareSeed(long jSeed, int jSize) { + final int iSize = Math.max(0, getSize() - fixedBytes()); + final int seedLen = Math.min(iSize, jSize) + FIXED_BYTES; + jSeed = maskSeed(jSeed, seedLen); + long iSeed = maskSeed(seed, seedLen); + final int cmplen = Math.min(iSize, jSize); + for (int i = 0; i < cmplen; i += Byte.SIZE) { + final int k = cmplen - i; + for (long j = Long.SIZE - Byte.SIZE; + j >= Math.max(0, Long.SIZE / Byte.SIZE - k) * Byte.SIZE; + j -= Byte.SIZE) { + final int xi = (int)((iSeed >>> j) & 0xFFL); + final int xj = (int)((jSeed >>> j) & 0xFFL); + if (xi != xj) { + return xi - xj; + } + } + iSeed = nextRand(iSeed); + jSeed = nextRand(jSeed); + } + return iSize - jSize; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other != null && other.getClass() == getClass()) { + final GridmixRecord o = ((GridmixRecord)other); + return getSize() == o.getSize() && seed == o.seed; + } + return false; + } + + @Override + public int hashCode() { + return (int)(seed * getSize()); + } + + public static class Comparator extends WritableComparator { + + public Comparator() { + super(GridmixRecord.class); + } + + public Comparator(Class> sub) { + super(sub); + } + + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int n1 = WritableUtils.decodeVIntSize(b1[s1]); + int n2 = WritableUtils.decodeVIntSize(b2[s2]); + n1 -= WritableUtils.getVIntSize(n1); + n2 -= WritableUtils.getVIntSize(n2); + return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2); + } + + static { + WritableComparator.define(GridmixRecord.class, new Comparator()); + } + } + +} Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java?rev=889778&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixSplit.java Fri Dec 11 19:32:58 2009 @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; + +class GridmixSplit extends CombineFileSplit { + private int id; + private int nSpec; + private int maps; + private int reduces; + private long inputRecords; + private long outputBytes; + private long outputRecords; + private long maxMemory; + private double[] reduceBytes = new double[0]; + private double[] reduceRecords = new double[0]; + + // Spec for reduces id mod this + private long[] reduceOutputBytes = new long[0]; + private long[] reduceOutputRecords = new long[0]; + + GridmixSplit() { + super(); + } + + public GridmixSplit(CombineFileSplit cfsplit, int maps, int id, + long inputBytes, long inputRecords, long outputBytes, + long outputRecords, double[] reduceBytes, double[] reduceRecords, + long[] reduceOutputBytes, long[] reduceOutputRecords) + throws IOException { + super(cfsplit); + this.id = id; + this.maps = maps; + reduces = reduceBytes.length; + this.inputRecords = inputRecords; + this.outputBytes = outputBytes; + this.outputRecords = outputRecords; + this.reduceBytes = reduceBytes; + this.reduceRecords = reduceRecords; + nSpec = reduceOutputBytes.length; + this.reduceOutputBytes = reduceOutputBytes; + this.reduceOutputRecords = reduceOutputRecords; + } + public int getId() { + return id; + } + public int getMapCount() { + return maps; + } + public long getInputRecords() { + return inputRecords; + } + public long[] getOutputBytes() { + if (0 == reduces) { + return new long[] { outputBytes }; + } + final long[] ret = new long[reduces]; + for (int i = 0; i < reduces; ++i) { + ret[i] = Math.round(outputBytes * reduceBytes[i]); + } + return ret; + } + public long[] getOutputRecords() { + if (0 == reduces) { + return new long[] { outputRecords }; + } + final long[] ret = new long[reduces]; + for (int i = 0; i < reduces; ++i) { + ret[i] = Math.round(outputRecords * reduceRecords[i]); + } + return ret; + } + public long getReduceBytes(int i) { + return reduceOutputBytes[i]; + } + public long getReduceRecords(int i) { + return reduceOutputRecords[i]; + } + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + WritableUtils.writeVInt(out, id); + WritableUtils.writeVInt(out, maps); + WritableUtils.writeVLong(out, inputRecords); + WritableUtils.writeVLong(out, outputBytes); + WritableUtils.writeVLong(out, outputRecords); + WritableUtils.writeVLong(out, maxMemory); + WritableUtils.writeVInt(out, reduces); + for (int i = 0; i < reduces; ++i) { + out.writeDouble(reduceBytes[i]); + out.writeDouble(reduceRecords[i]); + } + WritableUtils.writeVInt(out, nSpec); + for (int i = 0; i < nSpec; ++i) { + WritableUtils.writeVLong(out, reduceOutputBytes[i]); + WritableUtils.writeVLong(out, reduceOutputRecords[i]); + } + } + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + id = WritableUtils.readVInt(in); + maps = WritableUtils.readVInt(in); + inputRecords = WritableUtils.readVLong(in); + outputBytes = WritableUtils.readVLong(in); + outputRecords = WritableUtils.readVLong(in); + maxMemory = WritableUtils.readVLong(in); + reduces = WritableUtils.readVInt(in); + if (reduceBytes.length < reduces) { + reduceBytes = new double[reduces]; + reduceRecords = new double[reduces]; + } + for (int i = 0; i < reduces; ++i) { + reduceBytes[i] = in.readDouble(); + reduceRecords[i] = in.readDouble(); + } + nSpec = WritableUtils.readVInt(in); + if (reduceOutputBytes.length < nSpec) { + reduceOutputBytes = new long[nSpec]; + reduceOutputRecords = new long[nSpec]; + } + for (int i = 0; i < nSpec; ++i) { + reduceOutputBytes[i] = WritableUtils.readVLong(in); + reduceOutputRecords[i] = WritableUtils.readVLong(in); + } + } +} Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java?rev=889778&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java Fri Dec 11 19:32:58 2009 @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Given a {@link #FilePool}, obtain a set of files capable of satisfying + * a full set of splits, then iterate over each source to fill the request. + */ +class InputStriper { + public static final Log LOG = LogFactory.getLog(InputStriper.class); + int idx; + long currentStart; + FileStatus current; + final List files = new ArrayList(); + + /** + * @param inputDir Pool from which files are requested. + * @param mapBytes Sum of all expected split requests. + */ + InputStriper(FilePool inputDir, long mapBytes) + throws IOException { + final long inputBytes = inputDir.getInputFiles(mapBytes, files); + if (mapBytes > inputBytes) { + LOG.warn("Using " + inputBytes + "/" + mapBytes + " bytes"); + } + if (files.isEmpty() && mapBytes > 0) { + throw new IOException("Failed to satisfy request for " + mapBytes); + } + current = files.isEmpty() ? null : files.get(0); + } + + /** + * @param inputDir Pool used to resolve block locations. + * @param bytes Target byte count + * @param nLocs Number of block locations per split. + * @return A set of files satisfying the byte count, with locations weighted + * to the dominating proportion of input bytes. + */ + CombineFileSplit splitFor(FilePool inputDir, long bytes, int nLocs) + throws IOException { + final ArrayList paths = new ArrayList(); + final ArrayList start = new ArrayList(); + final ArrayList length = new ArrayList(); + final HashMap sb = new HashMap(); + do { + paths.add(current.getPath()); + start.add(currentStart); + final long fromFile = Math.min(bytes, current.getLen() - currentStart); + length.add(fromFile); + for (BlockLocation loc : + inputDir.locationsFor(current, currentStart, fromFile)) { + final double tedium = loc.getLength() / (1.0 * bytes); + for (String l : loc.getHosts()) { + Double j = sb.get(l); + if (null == j) { + sb.put(l, tedium); + } else { + sb.put(l, j.doubleValue() + tedium); + } + } + } + currentStart += fromFile; + bytes -= fromFile; + if (current.getLen() - currentStart == 0) { + current = files.get(++idx % files.size()); + currentStart = 0; + } + } while (bytes > 0); + final ArrayList> sort = + new ArrayList>(sb.entrySet()); + Collections.sort(sort, hostRank); + final String[] hosts = new String[Math.min(nLocs, sort.size())]; + for (int i = 0; i < nLocs && i < sort.size(); ++i) { + hosts[i] = sort.get(i).getKey(); + } + return new CombineFileSplit(paths.toArray(new Path[0]), + toLongArray(start), toLongArray(length), hosts); + } + + private long[] toLongArray(final ArrayList sigh) { + final long[] ret = new long[sigh.size()]; + for (int i = 0; i < ret.length; ++i) { + ret[i] = sigh.get(i); + } + return ret; + } + + static final Comparator> hostRank = + new Comparator>() { + public int compare(Entry a, Entry b) { + final double va = a.getValue(); + final double vb = b.getValue(); + return va > vb ? -1 : va < vb ? 1 : 0; + } + }; +} Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java?rev=889778&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/IntermediateRecordFactory.java Fri Dec 11 19:32:58 2009 @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +/** + * Factory passing reduce specification as its last record. + */ +class IntermediateRecordFactory extends RecordFactory { + + private final GridmixKey.Spec spec; + private final RecordFactory factory; + private final int partition; + private final long targetRecords; + private boolean done = false; + private long accRecords = 0L; + + /** + * @param targetBytes Expected byte count. + * @param targetRecords Expected record count; will emit spec records after + * this boundary is passed. + * @param partition Reduce to which records are emitted. + * @param spec Specification to emit. + * @param conf Unused. + */ + public IntermediateRecordFactory(long targetBytes, long targetRecords, + int partition, GridmixKey.Spec spec, Configuration conf) { + this(new AvgRecordFactory(targetBytes, targetRecords, conf), partition, + targetRecords, spec, conf); + } + + /** + * @param factory Factory from which byte/record counts are obtained. + * @param partition Reduce to which records are emitted. + * @param targetRecords Expected record count; will emit spec records after + * this boundary is passed. + * @param spec Specification to emit. + * @param conf Unused. + */ + public IntermediateRecordFactory(RecordFactory factory, int partition, + long targetRecords, GridmixKey.Spec spec, Configuration conf) { + this.spec = spec; + this.factory = factory; + this.partition = partition; + this.targetRecords = targetRecords; + } + + @Override + public boolean next(GridmixKey key, GridmixRecord val) throws IOException { + assert key != null; + final boolean rslt = factory.next(key, val); + ++accRecords; + if (rslt) { + if (accRecords < targetRecords) { + key.setType(GridmixKey.DATA); + } else { + final int orig = key.getSize(); + key.setType(GridmixKey.REDUCE_SPEC); + spec.rec_in = accRecords; + key.setSpec(spec); + val.setSize(val.getSize() - (key.getSize() - orig)); + // reset counters + accRecords = 0L; + spec.bytes_out = 0L; + spec.rec_out = 0L; + done = true; + } + } else if (!done) { + // ensure spec emitted + key.setType(GridmixKey.REDUCE_SPEC); + key.setPartition(partition); + key.setSize(0); + val.setSize(0); + spec.rec_in = 0L; + key.setSpec(spec); + done = true; + return true; + } + key.setPartition(partition); + return rslt; + } + + @Override + public float getProgress() throws IOException { + return factory.getProgress(); + } + + @Override + public void close() throws IOException { + factory.close(); + } +}