Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D352410C2C for ; Tue, 24 Sep 2013 22:46:29 +0000 (UTC) Received: (qmail 47303 invoked by uid 500); 24 Sep 2013 22:46:02 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 47033 invoked by uid 500); 24 Sep 2013 22:45:45 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 46769 invoked by uid 99); 24 Sep 2013 22:45:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Sep 2013 22:45:06 +0000 X-ASF-Spam-Status: No, hits=-2002.3 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 24 Sep 2013 22:44:53 +0000 Received: (qmail 43882 invoked by uid 99); 24 Sep 2013 22:44:12 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Sep 2013 22:44:12 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7FE629091FC; Tue, 24 Sep 2013 22:44:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.incubator.apache.org Date: Tue, 24 Sep 2013 22:44:21 -0000 Message-Id: <9c38084f97fc4c89b0e0a7f0479e3409@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [12/20] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java deleted file mode 100644 index 6b48270..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java +++ /dev/null @@ -1,1108 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.engine.common.sort.impl.dflt; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.IntBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.util.IndexedSortable; -import org.apache.hadoop.util.Progress; -import org.apache.hadoop.util.StringUtils; -import org.apache.tez.common.TezJobConfig; -import org.apache.tez.engine.api.TezOutputContext; -import org.apache.tez.engine.common.ConfigUtils; -import org.apache.tez.engine.common.sort.impl.ExternalSorter; -import org.apache.tez.engine.common.sort.impl.IFile; -import org.apache.tez.engine.common.sort.impl.TezIndexRecord; -import org.apache.tez.engine.common.sort.impl.TezMerger; -import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; -import org.apache.tez.engine.common.sort.impl.TezSpillRecord; -import org.apache.tez.engine.common.sort.impl.IFile.Writer; -import org.apache.tez.engine.common.sort.impl.TezMerger.Segment; - -@SuppressWarnings({"unchecked", "rawtypes"}) -public class DefaultSorter extends ExternalSorter implements IndexedSortable { - - private static final Log LOG = LogFactory.getLog(DefaultSorter.class); - - // TODO NEWTEZ Progress reporting to Tez framework. (making progress vs %complete) - - /** - * The size of each record in the index file for the map-outputs. - */ - public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24; - - private final static int APPROX_HEADER_LENGTH = 150; - - // k/v accounting - IntBuffer kvmeta; // metadata overlay on backing store - int kvstart; // marks origin of spill metadata - int kvend; // marks end of spill metadata - int kvindex; // marks end of fully serialized records - - int equator; // marks origin of meta/serialization - int bufstart; // marks beginning of spill - int bufend; // marks beginning of collectable - int bufmark; // marks end of record - int bufindex; // marks end of collected - int bufvoid; // marks the point where we should stop - // reading at the end of the buffer - - byte[] kvbuffer; // main output buffer - private final byte[] b0 = new byte[0]; - - protected static final int INDEX = 0; // index offset in acct - protected static final int VALSTART = 1; // val offset in acct - protected static final int KEYSTART = 2; // key offset in acct - protected static final int PARTITION = 3; // partition offset in acct - protected static final int NMETA = 4; // num meta ints - protected static final int METASIZE = NMETA * 4; // size in bytes - - // spill accounting - int maxRec; - int softLimit; - boolean spillInProgress; - int bufferRemaining; - volatile Throwable sortSpillException = null; - - int numSpills = 0; - int minSpillsForCombine; - final ReentrantLock spillLock = new ReentrantLock(); - final Condition spillDone = spillLock.newCondition(); - final Condition spillReady = spillLock.newCondition(); - final BlockingBuffer bb = new BlockingBuffer(); - volatile boolean spillThreadRunning = false; - final SpillThread spillThread = new SpillThread(); - - final ArrayList indexCacheList = - new ArrayList(); - private int totalIndexCacheMemory; - private int indexCacheMemoryLimit; - - @Override - public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException { - super.initialize(outputContext, conf, numOutputs); - - // sanity checks - final float spillper = this.conf.getFloat( - TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT, - TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT); - final int sortmb = this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_MB, - TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB); - if (spillper > (float) 1.0 || spillper <= (float) 0.0) { - throw new IOException("Invalid \"" - + TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT + "\": " + spillper); - } - if ((sortmb & 0x7FF) != sortmb) { - throw new IOException("Invalid \"" + TezJobConfig.TEZ_ENGINE_IO_SORT_MB - + "\": " + sortmb); - } - - indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES, - TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES); - - // buffers and accounting - int maxMemUsage = sortmb << 20; - maxMemUsage -= maxMemUsage % METASIZE; - kvbuffer = new byte[maxMemUsage]; - bufvoid = kvbuffer.length; - kvmeta = ByteBuffer.wrap(kvbuffer) - .order(ByteOrder.nativeOrder()) - .asIntBuffer(); - setEquator(0); - bufstart = bufend = bufindex = equator; - kvstart = kvend = kvindex; - - maxRec = kvmeta.capacity() / NMETA; - softLimit = (int)(kvbuffer.length * spillper); - bufferRemaining = softLimit; - if (LOG.isInfoEnabled()) { - LOG.info(TezJobConfig.TEZ_ENGINE_IO_SORT_MB + ": " + sortmb); - LOG.info("soft limit at " + softLimit); - LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid); - LOG.info("kvstart = " + kvstart + "; length = " + maxRec); - } - - // k/v serialization - valSerializer.open(bb); - keySerializer.open(bb); - - spillInProgress = false; - minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3); - spillThread.setDaemon(true); - spillThread.setName("SpillThread"); - spillLock.lock(); - try { - spillThread.start(); - while (!spillThreadRunning) { - spillDone.await(); - } - } catch (InterruptedException e) { - throw new IOException("Spill thread failed to initialize", e); - } finally { - spillLock.unlock(); - } - if (sortSpillException != null) { - throw new IOException("Spill thread failed to initialize", - sortSpillException); - } - } - - @Override - public void write(Object key, Object value) - throws IOException { - collect( - key, value, partitioner.getPartition(key, value, partitions)); - } - - /** - * Serialize the key, value to intermediate storage. - * When this method returns, kvindex must refer to sufficient unused - * storage to store one METADATA. - */ - synchronized void collect(Object key, Object value, final int partition - ) throws IOException { - - if (key.getClass() != keyClass) { - throw new IOException("Type mismatch in key from map: expected " - + keyClass.getName() + ", received " - + key.getClass().getName()); - } - if (value.getClass() != valClass) { - throw new IOException("Type mismatch in value from map: expected " - + valClass.getName() + ", received " - + value.getClass().getName()); - } - if (partition < 0 || partition >= partitions) { - throw new IOException("Illegal partition for " + key + " (" + - partition + ")" + ", TotalPartitions: " + partitions); - } - checkSpillException(); - bufferRemaining -= METASIZE; - if (bufferRemaining <= 0) { - // start spill if the thread is not running and the soft limit has been - // reached - spillLock.lock(); - try { - do { - if (!spillInProgress) { - final int kvbidx = 4 * kvindex; - final int kvbend = 4 * kvend; - // serialized, unspilled bytes always lie between kvindex and - // bufindex, crossing the equator. Note that any void space - // created by a reset must be included in "used" bytes - final int bUsed = distanceTo(kvbidx, bufindex); - final boolean bufsoftlimit = bUsed >= softLimit; - if ((kvbend + METASIZE) % kvbuffer.length != - equator - (equator % METASIZE)) { - // spill finished, reclaim space - resetSpill(); - bufferRemaining = Math.min( - distanceTo(bufindex, kvbidx) - 2 * METASIZE, - softLimit - bUsed) - METASIZE; - continue; - } else if (bufsoftlimit && kvindex != kvend) { - // spill records, if any collected; check latter, as it may - // be possible for metadata alignment to hit spill pcnt - startSpill(); - final int avgRec = (int) - (mapOutputByteCounter.getValue() / - mapOutputRecordCounter.getValue()); - // leave at least half the split buffer for serialization data - // ensure that kvindex >= bufindex - final int distkvi = distanceTo(bufindex, kvbidx); - final int newPos = (bufindex + - Math.max(2 * METASIZE - 1, - Math.min(distkvi / 2, - distkvi / (METASIZE + avgRec) * METASIZE))) - % kvbuffer.length; - setEquator(newPos); - bufmark = bufindex = newPos; - final int serBound = 4 * kvend; - // bytes remaining before the lock must be held and limits - // checked is the minimum of three arcs: the metadata space, the - // serialization space, and the soft limit - bufferRemaining = Math.min( - // metadata max - distanceTo(bufend, newPos), - Math.min( - // serialization max - distanceTo(newPos, serBound), - // soft limit - softLimit)) - 2 * METASIZE; - } - } - } while (false); - } finally { - spillLock.unlock(); - } - } - - try { - // serialize key bytes into buffer - int keystart = bufindex; - keySerializer.serialize(key); - if (bufindex < keystart) { - // wrapped the key; must make contiguous - bb.shiftBufferedKey(); - keystart = 0; - } - // serialize value bytes into buffer - final int valstart = bufindex; - valSerializer.serialize(value); - // It's possible for records to have zero length, i.e. the serializer - // will perform no writes. To ensure that the boundary conditions are - // checked and that the kvindex invariant is maintained, perform a - // zero-length write into the buffer. The logic monitoring this could be - // moved into collect, but this is cleaner and inexpensive. For now, it - // is acceptable. - bb.write(b0, 0, 0); - - // the record must be marked after the preceding write, as the metadata - // for this record are not yet written - int valend = bb.markRecord(); - - mapOutputRecordCounter.increment(1); - mapOutputByteCounter.increment( - distanceTo(keystart, valend, bufvoid)); - - // write accounting info - kvmeta.put(kvindex + INDEX, kvindex); - kvmeta.put(kvindex + PARTITION, partition); - kvmeta.put(kvindex + KEYSTART, keystart); - kvmeta.put(kvindex + VALSTART, valstart); - // advance kvindex - kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity(); - } catch (MapBufferTooSmallException e) { - LOG.info("Record too large for in-memory buffer: " + e.getMessage()); - spillSingleRecord(key, value, partition); - mapOutputRecordCounter.increment(1); - return; - } - } - - /** - * Set the point from which meta and serialization data expand. The meta - * indices are aligned with the buffer, so metadata never spans the ends of - * the circular buffer. - */ - private void setEquator(int pos) { - equator = pos; - // set index prior to first entry, aligned at meta boundary - final int aligned = pos - (pos % METASIZE); - kvindex = - ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4; - if (LOG.isInfoEnabled()) { - LOG.info("(EQUATOR) " + pos + " kvi " + kvindex + - "(" + (kvindex * 4) + ")"); - } - } - - /** - * The spill is complete, so set the buffer and meta indices to be equal to - * the new equator to free space for continuing collection. Note that when - * kvindex == kvend == kvstart, the buffer is empty. - */ - private void resetSpill() { - final int e = equator; - bufstart = bufend = e; - final int aligned = e - (e % METASIZE); - // set start/end to point to first meta record - kvstart = kvend = - ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4; - if (LOG.isInfoEnabled()) { - LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" + - (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")"); - } - } - - /** - * Compute the distance in bytes between two indices in the serialization - * buffer. - * @see #distanceTo(int,int,int) - */ - final int distanceTo(final int i, final int j) { - return distanceTo(i, j, kvbuffer.length); - } - - /** - * Compute the distance between two indices in the circular buffer given the - * max distance. - */ - int distanceTo(final int i, final int j, final int mod) { - return i <= j - ? j - i - : mod - i + j; - } - - /** - * For the given meta position, return the dereferenced position in the - * integer array. Each meta block contains several integers describing - * record data in its serialized form, but the INDEX is not necessarily - * related to the proximate metadata. The index value at the referenced int - * position is the start offset of the associated metadata block. So the - * metadata INDEX at metapos may point to the metadata described by the - * metadata block at metapos + k, which contains information about that - * serialized record. - */ - int offsetFor(int metapos) { - return kvmeta.get((metapos % maxRec) * NMETA + INDEX); - } - - /** - * Compare logical range, st i, j MOD offset capacity. - * Compare by partition, then by key. - * @see IndexedSortable#compare - */ - public int compare(final int mi, final int mj) { - final int kvi = offsetFor(mi); - final int kvj = offsetFor(mj); - final int kvip = kvmeta.get(kvi + PARTITION); - final int kvjp = kvmeta.get(kvj + PARTITION); - // sort by partition - if (kvip != kvjp) { - return kvip - kvjp; - } - // sort by key - return comparator.compare(kvbuffer, - kvmeta.get(kvi + KEYSTART), - kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART), - kvbuffer, - kvmeta.get(kvj + KEYSTART), - kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART)); - } - - /** - * Swap logical indices st i, j MOD offset capacity. - * @see IndexedSortable#swap - */ - public void swap(final int mi, final int mj) { - final int kvi = (mi % maxRec) * NMETA + INDEX; - final int kvj = (mj % maxRec) * NMETA + INDEX; - int tmp = kvmeta.get(kvi); - kvmeta.put(kvi, kvmeta.get(kvj)); - kvmeta.put(kvj, tmp); - } - - /** - * Inner class managing the spill of serialized records to disk. - */ - protected class BlockingBuffer extends DataOutputStream { - - public BlockingBuffer() { - super(new Buffer()); - } - - /** - * Mark end of record. Note that this is required if the buffer is to - * cut the spill in the proper place. - */ - public int markRecord() { - bufmark = bufindex; - return bufindex; - } - - /** - * Set position from last mark to end of writable buffer, then rewrite - * the data between last mark and kvindex. - * This handles a special case where the key wraps around the buffer. - * If the key is to be passed to a RawComparator, then it must be - * contiguous in the buffer. This recopies the data in the buffer back - * into itself, but starting at the beginning of the buffer. Note that - * this method should only be called immediately after detecting - * this condition. To call it at any other time is undefined and would - * likely result in data loss or corruption. - * @see #markRecord() - */ - protected void shiftBufferedKey() throws IOException { - // spillLock unnecessary; both kvend and kvindex are current - int headbytelen = bufvoid - bufmark; - bufvoid = bufmark; - final int kvbidx = 4 * kvindex; - final int kvbend = 4 * kvend; - final int avail = - Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend)); - if (bufindex + headbytelen < avail) { - System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex); - System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen); - bufindex += headbytelen; - bufferRemaining -= kvbuffer.length - bufvoid; - } else { - byte[] keytmp = new byte[bufindex]; - System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex); - bufindex = 0; - out.write(kvbuffer, bufmark, headbytelen); - out.write(keytmp); - } - } - } - - public class Buffer extends OutputStream { - private final byte[] scratch = new byte[1]; - - @Override - public void write(int v) - throws IOException { - scratch[0] = (byte)v; - write(scratch, 0, 1); - } - - /** - * Attempt to write a sequence of bytes to the collection buffer. - * This method will block if the spill thread is running and it - * cannot write. - * @throws MapBufferTooSmallException if record is too large to - * deserialize into the collection buffer. - */ - @Override - public void write(byte b[], int off, int len) - throws IOException { - // must always verify the invariant that at least METASIZE bytes are - // available beyond kvindex, even when len == 0 - bufferRemaining -= len; - if (bufferRemaining <= 0) { - // writing these bytes could exhaust available buffer space or fill - // the buffer to soft limit. check if spill or blocking are necessary - boolean blockwrite = false; - spillLock.lock(); - try { - do { - checkSpillException(); - - final int kvbidx = 4 * kvindex; - final int kvbend = 4 * kvend; - // ser distance to key index - final int distkvi = distanceTo(bufindex, kvbidx); - // ser distance to spill end index - final int distkve = distanceTo(bufindex, kvbend); - - // if kvindex is closer than kvend, then a spill is neither in - // progress nor complete and reset since the lock was held. The - // write should block only if there is insufficient space to - // complete the current write, write the metadata for this record, - // and write the metadata for the next record. If kvend is closer, - // then the write should block if there is too little space for - // either the metadata or the current write. Note that collect - // ensures its metadata requirement with a zero-length write - blockwrite = distkvi <= distkve - ? distkvi <= len + 2 * METASIZE - : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE; - - if (!spillInProgress) { - if (blockwrite) { - if ((kvbend + METASIZE) % kvbuffer.length != - equator - (equator % METASIZE)) { - // spill finished, reclaim space - // need to use meta exclusively; zero-len rec & 100% spill - // pcnt would fail - resetSpill(); // resetSpill doesn't move bufindex, kvindex - bufferRemaining = Math.min( - distkvi - 2 * METASIZE, - softLimit - distanceTo(kvbidx, bufindex)) - len; - continue; - } - // we have records we can spill; only spill if blocked - if (kvindex != kvend) { - startSpill(); - // Blocked on this write, waiting for the spill just - // initiated to finish. Instead of repositioning the marker - // and copying the partial record, we set the record start - // to be the new equator - setEquator(bufmark); - } else { - // We have no buffered records, and this record is too large - // to write into kvbuffer. We must spill it directly from - // collect - final int size = distanceTo(bufstart, bufindex) + len; - setEquator(0); - bufstart = bufend = bufindex = equator; - kvstart = kvend = kvindex; - bufvoid = kvbuffer.length; - throw new MapBufferTooSmallException(size + " bytes"); - } - } - } - - if (blockwrite) { - // wait for spill - try { - while (spillInProgress) { - spillDone.await(); - } - } catch (InterruptedException e) { - throw new IOException( - "Buffer interrupted while waiting for the writer", e); - } - } - } while (blockwrite); - } finally { - spillLock.unlock(); - } - } - // here, we know that we have sufficient space to write - if (bufindex + len > bufvoid) { - final int gaplen = bufvoid - bufindex; - System.arraycopy(b, off, kvbuffer, bufindex, gaplen); - len -= gaplen; - off += gaplen; - bufindex = 0; - } - System.arraycopy(b, off, kvbuffer, bufindex, len); - bufindex += len; - } - } - - @Override - public void flush() throws IOException { - LOG.info("Starting flush of map output"); - spillLock.lock(); - try { - while (spillInProgress) { - spillDone.await(); - } - checkSpillException(); - - final int kvbend = 4 * kvend; - if ((kvbend + METASIZE) % kvbuffer.length != - equator - (equator % METASIZE)) { - // spill finished - resetSpill(); - } - if (kvindex != kvend) { - kvend = (kvindex + NMETA) % kvmeta.capacity(); - bufend = bufmark; - if (LOG.isInfoEnabled()) { - LOG.info("Sorting & Spilling map output"); - LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark + - "; bufvoid = " + bufvoid); - LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) + - "); kvend = " + kvend + "(" + (kvend * 4) + - "); length = " + (distanceTo(kvend, kvstart, - kvmeta.capacity()) + 1) + "/" + maxRec); - } - sortAndSpill(); - } - } catch (InterruptedException e) { - throw new IOException("Interrupted while waiting for the writer", e); - } finally { - spillLock.unlock(); - } - assert !spillLock.isHeldByCurrentThread(); - // shut down spill thread and wait for it to exit. Since the preceding - // ensures that it is finished with its work (and sortAndSpill did not - // throw), we elect to use an interrupt instead of setting a flag. - // Spilling simultaneously from this thread while the spill thread - // finishes its work might be both a useful way to extend this and also - // sufficient motivation for the latter approach. - try { - spillThread.interrupt(); - spillThread.join(); - } catch (InterruptedException e) { - throw new IOException("Spill failed", e); - } - // release sort buffer before the merge - //FIXME - //kvbuffer = null; - mergeParts(); - Path outputPath = mapOutputFile.getOutputFile(); - fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen()); - } - - @Override - public void close() throws IOException { } - - protected class SpillThread extends Thread { - - @Override - public void run() { - spillLock.lock(); - spillThreadRunning = true; - try { - while (true) { - spillDone.signal(); - while (!spillInProgress) { - spillReady.await(); - } - try { - spillLock.unlock(); - sortAndSpill(); - } catch (Throwable t) { - LOG.warn("Got an exception in sortAndSpill", t); - sortSpillException = t; - } finally { - spillLock.lock(); - if (bufend < bufstart) { - bufvoid = kvbuffer.length; - } - kvstart = kvend; - bufstart = bufend; - spillInProgress = false; - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - spillLock.unlock(); - spillThreadRunning = false; - } - } - } - - private void checkSpillException() throws IOException { - final Throwable lspillException = sortSpillException; - if (lspillException != null) { - if (lspillException instanceof Error) { - final String logMsg = "Task " + outputContext.getUniqueIdentifier() - + " failed : " + StringUtils.stringifyException(lspillException); - outputContext.fatalError(lspillException, logMsg); - } - throw new IOException("Spill failed", lspillException); - } - } - - private void startSpill() { - assert !spillInProgress; - kvend = (kvindex + NMETA) % kvmeta.capacity(); - bufend = bufmark; - spillInProgress = true; - if (LOG.isInfoEnabled()) { - LOG.info("Spilling map output"); - LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark + - "; bufvoid = " + bufvoid); - LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) + - "); kvend = " + kvend + "(" + (kvend * 4) + - "); length = " + (distanceTo(kvend, kvstart, - kvmeta.capacity()) + 1) + "/" + maxRec); - } - spillReady.signal(); - } - - int getMetaStart() { - return kvend / NMETA; - } - - int getMetaEnd() { - return 1 + // kvend is a valid record - (kvstart >= kvend - ? kvstart - : kvmeta.capacity() + kvstart) / NMETA; - } - - protected void sortAndSpill() - throws IOException, InterruptedException { - final int mstart = getMetaStart(); - final int mend = getMetaEnd(); - sorter.sort(this, mstart, mend, nullProgressable); - spill(mstart, mend); - } - - protected void spill(int mstart, int mend) - throws IOException, InterruptedException { - - //approximate the length of the output file to be the length of the - //buffer + header lengths for the partitions - final long size = (bufend >= bufstart - ? bufend - bufstart - : (bufvoid - bufend) + bufstart) + - partitions * APPROX_HEADER_LENGTH; - FSDataOutputStream out = null; - try { - // create spill file - final TezSpillRecord spillRec = new TezSpillRecord(partitions); - final Path filename = - mapOutputFile.getSpillFileForWrite(numSpills, size); - out = rfs.create(filename); - - int spindex = mstart; - final InMemValBytes value = createInMemValBytes(); - for (int i = 0; i < partitions; ++i) { - IFile.Writer writer = null; - try { - long segmentStart = out.getPos(); - writer = new Writer(conf, out, keyClass, valClass, codec, - spilledRecordsCounter); - if (combiner == null) { - // spill directly - DataInputBuffer key = new DataInputBuffer(); - while (spindex < mend && - kvmeta.get(offsetFor(spindex) + PARTITION) == i) { - final int kvoff = offsetFor(spindex); - key.reset( - kvbuffer, - kvmeta.get(kvoff + KEYSTART), - (kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART)) - ); - getVBytesForOffset(kvoff, value); - writer.append(key, value); - ++spindex; - } - } else { - int spstart = spindex; - while (spindex < mend && - kvmeta.get(offsetFor(spindex) - + PARTITION) == i) { - ++spindex; - } - // Note: we would like to avoid the combiner if we've fewer - // than some threshold of records for a partition - if (spstart != spindex) { - TezRawKeyValueIterator kvIter = - new MRResultIterator(spstart, spindex); - if (LOG.isDebugEnabled()) { - LOG.debug("Running combine processor"); - } - runCombineProcessor(kvIter, writer); - } - } - - // close the writer - writer.close(); - - // record offsets - final TezIndexRecord rec = - new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); - spillRec.putIndex(rec, i); - - writer = null; - } finally { - if (null != writer) writer.close(); - } - } - - if (totalIndexCacheMemory >= indexCacheMemoryLimit) { - // create spill index file - Path indexFilename = - mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions - * MAP_OUTPUT_INDEX_RECORD_LENGTH); - spillRec.writeToFile(indexFilename, conf); - } else { - indexCacheList.add(spillRec); - totalIndexCacheMemory += - spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; - } - LOG.info("Finished spill " + numSpills); - ++numSpills; - } finally { - if (out != null) out.close(); - } - } - - /** - * Handles the degenerate case where serialization fails to fit in - * the in-memory buffer, so we must spill the record from collect - * directly to a spill file. Consider this "losing". - */ - private void spillSingleRecord(final Object key, final Object value, - int partition) throws IOException { - long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH; - FSDataOutputStream out = null; - try { - // create spill file - final TezSpillRecord spillRec = new TezSpillRecord(partitions); - final Path filename = - mapOutputFile.getSpillFileForWrite(numSpills, size); - out = rfs.create(filename); - - // we don't run the combiner for a single record - for (int i = 0; i < partitions; ++i) { - IFile.Writer writer = null; - try { - long segmentStart = out.getPos(); - // Create a new codec, don't care! - writer = new IFile.Writer(conf, out, keyClass, valClass, codec, - spilledRecordsCounter); - - if (i == partition) { - final long recordStart = out.getPos(); - writer.append(key, value); - // Note that our map byte count will not be accurate with - // compression - mapOutputByteCounter.increment(out.getPos() - recordStart); - } - writer.close(); - - // record offsets - TezIndexRecord rec = - new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); - spillRec.putIndex(rec, i); - - writer = null; - } catch (IOException e) { - if (null != writer) writer.close(); - throw e; - } - } - if (totalIndexCacheMemory >= indexCacheMemoryLimit) { - // create spill index file - Path indexFilename = - mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions - * MAP_OUTPUT_INDEX_RECORD_LENGTH); - spillRec.writeToFile(indexFilename, conf); - } else { - indexCacheList.add(spillRec); - totalIndexCacheMemory += - spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; - } - ++numSpills; - } finally { - if (out != null) out.close(); - } - } - - protected int getInMemVBytesLength(int kvoff) { - // get the keystart for the next serialized value to be the end - // of this value. If this is the last value in the buffer, use bufend - final int nextindex = kvoff == kvend - ? bufend - : kvmeta.get( - (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity()); - // calculate the length of the value - int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART)) - ? nextindex - kvmeta.get(kvoff + VALSTART) - : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex; - return vallen; - } - - /** - * Given an offset, populate vbytes with the associated set of - * deserialized value bytes. Should only be called during a spill. - */ - int getVBytesForOffset(int kvoff, InMemValBytes vbytes) { - int vallen = getInMemVBytesLength(kvoff); - vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen); - return vallen; - } - - /** - * Inner class wrapping valuebytes, used for appendRaw. - */ - static class InMemValBytes extends DataInputBuffer { - private byte[] buffer; - private int start; - private int length; - private final int bufvoid; - - public InMemValBytes(int bufvoid) { - this.bufvoid = bufvoid; - } - - public void reset(byte[] buffer, int start, int length) { - this.buffer = buffer; - this.start = start; - this.length = length; - - if (start + length > bufvoid) { - this.buffer = new byte[this.length]; - final int taillen = bufvoid - start; - System.arraycopy(buffer, start, this.buffer, 0, taillen); - System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen); - this.start = 0; - } - - super.reset(this.buffer, this.start, this.length); - } - } - - InMemValBytes createInMemValBytes() { - return new InMemValBytes(bufvoid); - } - - protected class MRResultIterator implements TezRawKeyValueIterator { - private final DataInputBuffer keybuf = new DataInputBuffer(); - private final InMemValBytes vbytes = createInMemValBytes(); - private final int end; - private int current; - public MRResultIterator(int start, int end) { - this.end = end; - current = start - 1; - } - public boolean next() throws IOException { - return ++current < end; - } - public DataInputBuffer getKey() throws IOException { - final int kvoff = offsetFor(current); - keybuf.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART), - kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART)); - return keybuf; - } - public DataInputBuffer getValue() throws IOException { - getVBytesForOffset(offsetFor(current), vbytes); - return vbytes; - } - public Progress getProgress() { - return null; - } - public void close() { } - } - - private void mergeParts() throws IOException { - // get the approximate size of the final output/index files - long finalOutFileSize = 0; - long finalIndexFileSize = 0; - final Path[] filename = new Path[numSpills]; - final String taskIdentifier = outputContext.getUniqueIdentifier(); - - for(int i = 0; i < numSpills; i++) { - filename[i] = mapOutputFile.getSpillFile(i); - finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); - } - if (numSpills == 1) { //the spill is the final output - sameVolRename(filename[0], - mapOutputFile.getOutputFileForWriteInVolume(filename[0])); - if (indexCacheList.size() == 0) { - sameVolRename(mapOutputFile.getSpillIndexFile(0), - mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0])); - } else { - indexCacheList.get(0).writeToFile( - mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf); - } - return; - } - - // read in paged indices - for (int i = indexCacheList.size(); i < numSpills; ++i) { - Path indexFileName = mapOutputFile.getSpillIndexFile(i); - indexCacheList.add(new TezSpillRecord(indexFileName, conf)); - } - - //make correction in the length to include the sequence file header - //lengths for each partition - finalOutFileSize += partitions * APPROX_HEADER_LENGTH; - finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; - Path finalOutputFile = - mapOutputFile.getOutputFileForWrite(finalOutFileSize); - Path finalIndexFile = - mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); - - //The output stream for the final single output file - FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); - - if (numSpills == 0) { - //create dummy files - - TezSpillRecord sr = new TezSpillRecord(partitions); - try { - for (int i = 0; i < partitions; i++) { - long segmentStart = finalOut.getPos(); - Writer writer = - new Writer(conf, finalOut, keyClass, valClass, codec, null); - writer.close(); - - TezIndexRecord rec = - new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); - sr.putIndex(rec, i); - } - sr.writeToFile(finalIndexFile, conf); - } finally { - finalOut.close(); - } - return; - } - else { - TezMerger.considerFinalMergeForProgress(); - - final TezSpillRecord spillRec = new TezSpillRecord(partitions); - for (int parts = 0; parts < partitions; parts++) { - //create the segments to be merged - List segmentList = - new ArrayList(numSpills); - for(int i = 0; i < numSpills; i++) { - TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); - - Segment s = - new Segment(conf, rfs, filename[i], indexRecord.getStartOffset(), - indexRecord.getPartLength(), codec, true); - segmentList.add(i, s); - - if (LOG.isDebugEnabled()) { - LOG.debug("TaskIdentifier=" + taskIdentifier + " Partition=" + parts + - "Spill =" + i + "(" + indexRecord.getStartOffset() + "," + - indexRecord.getRawLength() + ", " + - indexRecord.getPartLength() + ")"); - } - } - - int mergeFactor = - this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, - TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR); - // sort the segments only if there are intermediate merges - boolean sortSegments = segmentList.size() > mergeFactor; - //merge - TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs, - keyClass, valClass, codec, - segmentList, mergeFactor, - new Path(taskIdentifier), - (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf), - nullProgressable, sortSegments, - null, spilledRecordsCounter, - null); // Not using any Progress in TezMerger. Should just work. - - //write merged output to disk - long segmentStart = finalOut.getPos(); - Writer writer = - new Writer(conf, finalOut, keyClass, valClass, codec, - spilledRecordsCounter); - if (combiner == null || numSpills < minSpillsForCombine) { - TezMerger.writeFile(kvIter, writer, - nullProgressable, conf); - } else { - runCombineProcessor(kvIter, writer); - } - writer.close(); - - // record offsets - final TezIndexRecord rec = - new TezIndexRecord( - segmentStart, - writer.getRawLength(), - writer.getCompressedLength()); - spillRec.putIndex(rec, parts); - } - spillRec.writeToFile(finalIndexFile, conf); - finalOut.close(); - for(int i = 0; i < numSpills; i++) { - rfs.delete(filename[i],true); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java deleted file mode 100644 index e2b3315..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java +++ /dev/null @@ -1,126 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.engine.common.sort.impl.dflt; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.IntBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.util.DataChecksum; -import org.apache.tez.engine.api.TezOutputContext; -import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader; -import org.apache.tez.engine.common.shuffle.server.ShuffleHandler; -import org.apache.tez.engine.common.sort.impl.IFile; - -public class InMemoryShuffleSorter extends DefaultSorter { - - private static final Log LOG = LogFactory.getLog(InMemoryShuffleSorter.class); - - static final int IFILE_EOF_LENGTH = - 2 * WritableUtils.getVIntSize(IFile.EOF_MARKER); - static final int IFILE_CHECKSUM_LENGTH = DataChecksum.Type.CRC32.size; - - private List spillIndices = new ArrayList(); - private List shuffleHeaders = new ArrayList(); - - ShuffleHandler shuffleHandler = new ShuffleHandler(this); - - byte[] kvbuffer; - IntBuffer kvmeta; - - @Override - public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException { - super.initialize(outputContext, conf, numOutputs); - shuffleHandler.initialize(outputContext, conf); - } - - @Override - protected void spill(int mstart, int mend) - throws IOException, InterruptedException { - // Start the shuffleHandler - shuffleHandler.start(); - - // Don't spill! - - // Make a copy - this.kvbuffer = super.kvbuffer; - this.kvmeta = super.kvmeta; - - // Just save spill-indices for serving later - int spindex = mstart; - for (int i = 0; i < partitions; ++i) { - spillIndices.add(spindex); - - int length = 0; - while (spindex < mend && - kvmeta.get(offsetFor(spindex) + PARTITION) == i) { - - final int kvoff = offsetFor(spindex); - int keyLen = - kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART); - int valLen = getInMemVBytesLength(kvoff); - length += - (keyLen + WritableUtils.getVIntSize(keyLen)) + - (valLen + WritableUtils.getVIntSize(valLen)); - - ++spindex; - } - length += IFILE_EOF_LENGTH; - - shuffleHeaders.add( - new ShuffleHeader( - outputContext.getUniqueIdentifier(), // TODO Verify that this is correct. - length + IFILE_CHECKSUM_LENGTH, length, i) - ); - LOG.info("shuffleHeader[" + i + "]:" + - " rawLen=" + length + " partLen=" + (length + IFILE_CHECKSUM_LENGTH) + - " spillIndex=" + spillIndices.get(i)); - } - - LOG.info("Saved " + spillIndices.size() + " spill-indices and " + - shuffleHeaders.size() + " shuffle headers"); - } - - @Override - public InputStream getSortedStream(int partition) { - return new SortBufferInputStream(this, partition); - } - - @Override - public void close() throws IOException { - // FIXME - //shuffleHandler.stop(); - } - - @Override - public ShuffleHeader getShuffleHeader(int reduce) { - return shuffleHeaders.get(reduce); - } - - public int getSpillIndex(int partition) { - return spillIndices.get(partition); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java deleted file mode 100644 index d74e159..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java +++ /dev/null @@ -1,271 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.engine.common.sort.impl.dflt; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.IntBuffer; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.BoundedByteArrayOutputStream; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.WritableUtils; -import org.apache.tez.engine.common.shuffle.impl.InMemoryWriter; -import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter.InMemValBytes; - - public class SortBufferInputStream extends InputStream { - - private static final Log LOG = LogFactory.getLog(SortBufferInputStream.class); - - private final InMemoryShuffleSorter sorter; - private InMemoryWriter sortOutput; - - private int mend; - private int recIndex; - private final byte[] kvbuffer; - private final IntBuffer kvmeta; - private final int partitionBytes; - private final int partition; - - byte[] dualBuf = new byte[8192]; - DualBufferOutputStream out; - private int readBytes = 0; - - public SortBufferInputStream( - InMemoryShuffleSorter sorter, int partition) { - this.sorter = sorter; - this.partitionBytes = - (int)sorter.getShuffleHeader(partition).getCompressedLength(); - this.partition = partition; - this.mend = sorter.getMetaEnd(); - this.recIndex = sorter.getSpillIndex(partition); - this.kvbuffer = sorter.kvbuffer; - this.kvmeta = sorter.kvmeta; - out = new DualBufferOutputStream(null, 0, 0, dualBuf); - sortOutput = new InMemoryWriter(out); - } - - byte[] one = new byte[1]; - - @Override - public int read() throws IOException { - int b = read(one, 0, 1); - return (b == -1) ? b : one[0]; - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (available() == 0) { - return -1; - } - - int currentOffset = off; - int currentLength = len; - int currentReadBytes = 0; - - // Check if there is residual data in the dualBuf - int residualLen = out.getCurrent(); - if (residualLen > 0) { - int readable = Math.min(currentLength, residualLen); - System.arraycopy(dualBuf, 0, b, currentOffset, readable); - currentOffset += readable; - currentReadBytes += readable; - out.setCurrentPointer(-readable); - - // buffer has less capacity - currentLength -= readable; - - if (LOG.isDebugEnabled()) { - LOG.debug("XXX read_residual:" + - " readable=" + readable + - " readBytes=" + readBytes); - } - } - - // Now, use the provided buffer - if (LOG.isDebugEnabled()) { - LOG.debug("XXX read: out.reset" + - " b=" + b + - " currentOffset=" + currentOffset + - " currentLength=" + currentLength + - " recIndex=" + recIndex); - } - out.reset(b, currentOffset, currentLength); - - // Read from sort-buffer into the provided buffer, space permitting - DataInputBuffer key = new DataInputBuffer(); - final InMemValBytes value = sorter.createInMemValBytes(); - - int kvPartition = 0; - int numRec = 0; - for (; - currentLength > 0 && recIndex < mend && - (kvPartition = getKVPartition(recIndex)) == partition; - ++recIndex) { - - final int kvoff = sorter.offsetFor(recIndex); - - int keyLen = - (kvmeta.get(kvoff + InMemoryShuffleSorter.VALSTART) - - kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART)); - key.reset( - kvbuffer, - kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART), - keyLen - ); - - int valLen = sorter.getVBytesForOffset(kvoff, value); - - int recLen = - (keyLen + WritableUtils.getVIntSize(keyLen)) + - (valLen + WritableUtils.getVIntSize(valLen)); - - currentReadBytes += recLen; - currentOffset += recLen; - currentLength -= recLen; - - // Write out key/value into the in-mem ifile - if (LOG.isDebugEnabled()) { - LOG.debug("XXX read: sortOutput.append" + - " #rec=" + ++numRec + - " recIndex=" + recIndex + " kvoff=" + kvoff + - " keyLen=" + keyLen + " valLen=" + valLen + " recLen=" + recLen + - " readBytes=" + readBytes + - " currentReadBytes=" + currentReadBytes + - " currentLength=" + currentLength); - } - sortOutput.append(key, value); - } - - // If we are at the end of the segment, close the ifile - if (currentLength > 0 && - (recIndex == mend || kvPartition != partition)) { - if (LOG.isDebugEnabled()) { - LOG.debug("XXX About to call close:" + - " currentLength=" + currentLength + - " recIndex=" + recIndex + " mend=" + mend + - " kvPartition=" + kvPartition + " partitino=" + partition); - } - sortOutput.close(); - currentReadBytes += - (InMemoryShuffleSorter.IFILE_EOF_LENGTH + - InMemoryShuffleSorter.IFILE_CHECKSUM_LENGTH); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("XXX Hmm..." + - " currentLength=" + currentLength + - " recIndex=" + recIndex + " mend=" + mend + - " kvPartition=" + kvPartition + " partitino=" + partition); - } - } - - int retVal = Math.min(currentReadBytes, len); - readBytes += retVal; - if (LOG.isDebugEnabled()) { - LOG.debug("XXX read: done" + - " retVal=" + retVal + - " currentReadBytes=" + currentReadBytes + - " len=" + len + - " readBytes=" + readBytes + - " partitionBytes=" + partitionBytes + - " residualBytes=" + out.getCurrent()); - } - return retVal; - } - - private int getKVPartition(int recIndex) { - return kvmeta.get( - sorter.offsetFor(recIndex) + InMemoryShuffleSorter.PARTITION); - } - - @Override - public int available() throws IOException { - return (partitionBytes - readBytes); - } - - @Override - public void close() throws IOException { - super.close(); - } - - @Override - public boolean markSupported() { - return false; - } - - static class DualBufferOutputStream extends BoundedByteArrayOutputStream { - - byte[] dualBuf; - int currentPointer = 0; - byte[] one = new byte[1]; - - public DualBufferOutputStream( - byte[] buf, int offset, int length, - byte[] altBuf) { - super(buf, offset, length); - this.dualBuf = altBuf; - } - - public void reset(byte[] b, int off, int len) { - super.resetBuffer(b, off, len); - } - - @Override - public void write(int b) throws IOException { - one[0] = (byte)b; - write(one, 0, 1); - } - - @Override - public void write(byte[] b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - int available = super.available(); - if (available >= len) { - super.write(b, off, len); - } else { - super.write(b, off, available); - System.arraycopy(b, off+available, dualBuf, currentPointer, len-available); - currentPointer += (len - available); - } - } - - int getCurrent() { - return currentPointer; - } - - void setCurrentPointer(int delta) { - if ((currentPointer + delta) > dualBuf.length) { - throw new IndexOutOfBoundsException("Trying to set dualBuf 'current'" + - " marker to " + (currentPointer+delta) + " when " + - " dualBuf.length is " + dualBuf.length); - } - currentPointer = (currentPointer + delta) % dualBuf.length; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java deleted file mode 100644 index 841e54d..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java +++ /dev/null @@ -1,149 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.engine.common.task.impl; - -import java.io.IOException; -import java.util.Iterator; -import java.util.NoSuchElementException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.util.Progressable; -import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; - - -/** - * Iterates values while keys match in sorted input. - * - * Usage: Call moveToNext to move to the next k, v pair. This returns true if another exists, - * followed by getKey() and getValues() to get the current key and list of values. - * - */ -public class ValuesIterator implements Iterator { - protected TezRawKeyValueIterator in; //input iterator - private KEY key; // current key - private KEY nextKey; - private VALUE value; // current value - private boolean hasNext; // more w/ this key - private boolean more; // more in file - private RawComparator comparator; - protected Progressable reporter; - private Deserializer keyDeserializer; - private Deserializer valDeserializer; - private DataInputBuffer keyIn = new DataInputBuffer(); - private DataInputBuffer valueIn = new DataInputBuffer(); - - public ValuesIterator (TezRawKeyValueIterator in, - RawComparator comparator, - Class keyClass, - Class valClass, Configuration conf, - Progressable reporter) - throws IOException { - this.in = in; - this.comparator = comparator; - this.reporter = reporter; - SerializationFactory serializationFactory = new SerializationFactory(conf); - this.keyDeserializer = serializationFactory.getDeserializer(keyClass); - this.keyDeserializer.open(keyIn); - this.valDeserializer = serializationFactory.getDeserializer(valClass); - this.valDeserializer.open(this.valueIn); - readNextKey(); - key = nextKey; - nextKey = null; // force new instance creation - hasNext = more; - } - - TezRawKeyValueIterator getRawIterator() { return in; } - - /// Iterator methods - - public boolean hasNext() { return hasNext; } - - private int ctr = 0; - public VALUE next() { - if (!hasNext) { - throw new NoSuchElementException("iterate past last value"); - } - try { - readNextValue(); - readNextKey(); - } catch (IOException ie) { - throw new RuntimeException("problem advancing post rec#"+ctr, ie); - } - reporter.progress(); - return value; - } - - public void remove() { throw new RuntimeException("not implemented"); } - - /// Auxiliary methods - - /** Start processing next unique key. */ - public void nextKey() throws IOException { - // read until we find a new key - while (hasNext) { - readNextKey(); - } - ++ctr; - - // move the next key to the current one - KEY tmpKey = key; - key = nextKey; - nextKey = tmpKey; - hasNext = more; - } - - /** True iff more keys remain. */ - public boolean more() { - return more; - } - - /** The current key. */ - public KEY getKey() { - return key; - } - - /** - * read the next key - */ - private void readNextKey() throws IOException { - more = in.next(); - if (more) { - DataInputBuffer nextKeyBytes = in.getKey(); - keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength()); - nextKey = keyDeserializer.deserialize(nextKey); - hasNext = key != null && (comparator.compare(key, nextKey) == 0); - } else { - hasNext = false; - } - } - - /** - * Read the next value - * @throws IOException - */ - private void readNextValue() throws IOException { - DataInputBuffer nextValueBytes = in.getValue(); - valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength()); - value = valDeserializer.deserialize(value); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java deleted file mode 100644 index 40e6b1a..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.engine.common.task.local.output; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.Path; -import org.apache.tez.common.Constants; -import org.apache.tez.common.TezJobConfig; -import org.apache.tez.engine.common.InputAttemptIdentifier; - -/** - * Manipulate the working area for the transient store for maps and reduces. - * - * This class is used by map and reduce tasks to identify the directories that - * they need to write to/read from for intermediate files. The callers of - * these methods are from the Child running the Task. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class TezLocalTaskOutputFiles extends TezTaskOutput { - - public TezLocalTaskOutputFiles(Configuration conf, String uniqueId) { - super(conf, uniqueId); - } - - private LocalDirAllocator lDirAlloc = - new LocalDirAllocator(TezJobConfig.LOCAL_DIRS); - - - /** - * Return the path to local map output file created earlier - * - * @return path - * @throws IOException - */ - @Override - public Path getOutputFile() - throws IOException { - return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR - + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, conf); - } - - /** - * Create a local map output file name. - * - * @param size the size of the file - * @return path - * @throws IOException - */ - @Override - public Path getOutputFileForWrite(long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR - + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, conf); - } - - /** - * Create a local map output file name. This should *only* be used if the size - * of the file is not known. Otherwise use the equivalent which accepts a size - * parameter. - * - * @return path - * @throws IOException - */ - @Override - public Path getOutputFileForWrite() throws IOException { - return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR - + Path.SEPARATOR + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, - conf); - } - - /** - * Create a local map output file name on the same volume. - */ - @Override - public Path getOutputFileForWriteInVolume(Path existing) { - return new Path(existing.getParent(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING); - } - - /** - * Return the path to a local map output index file created earlier - * - * @return path - * @throws IOException - */ - @Override - public Path getOutputIndexFile() - throws IOException { - return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR - + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING, - conf); - } - - /** - * Create a local map output index file name. - * - * @param size the size of the file - * @return path - * @throws IOException - */ - @Override - public Path getOutputIndexFileForWrite(long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR - + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING, - size, conf); - } - - /** - * Create a local map output index file name on the same volume. - */ - @Override - public Path getOutputIndexFileForWriteInVolume(Path existing) { - return new Path(existing.getParent(), - Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING); - } - - /** - * Return a local map spill file created earlier. - * - * @param spillNumber the number - * @return path - * @throws IOException - */ - @Override - public Path getSpillFile(int spillNumber) - throws IOException { - return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill" - + spillNumber + ".out", conf); - } - - /** - * Create a local map spill file name. - * - * @param spillNumber the number - * @param size the size of the file - * @return path - * @throws IOException - */ - @Override - public Path getSpillFileForWrite(int spillNumber, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill" - + spillNumber + ".out", size, conf); - } - - /** - * Return a local map spill index file created earlier - * - * @param spillNumber the number - * @return path - * @throws IOException - */ - @Override - public Path getSpillIndexFile(int spillNumber) - throws IOException { - return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill" - + spillNumber + ".out.index", conf); - } - - /** - * Create a local map spill index file name. - * - * @param spillNumber the number - * @param size the size of the file - * @return path - * @throws IOException - */ - @Override - public Path getSpillIndexFileForWrite(int spillNumber, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill" - + spillNumber + ".out.index", size, conf); - } - - /** - * Return a local reduce input file created earlier - * - * @param mapId a map task id - * @return path - * @throws IOException - */ - @Override - public Path getInputFile(InputAttemptIdentifier mapId) - throws IOException { - return lDirAlloc.getLocalPathToRead(String.format( - Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, - Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getSrcTaskIndex())), conf); - } - - /** - * Create a local reduce input file name. - * - * @param mapId a map task id - * @param size the size of the file - * @return path - * @throws IOException - */ - @Override - public Path getInputFileForWrite(int taskId, - long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(String.format( - Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, taskId), - size, conf); - } - - /** Removes all of the files related to a task. */ - @Override - public void removeAll() - throws IOException { - deleteLocalFiles(Constants.TASK_OUTPUT_DIR); - } - - private String[] getLocalDirs() throws IOException { - return conf.getStrings(TezJobConfig.LOCAL_DIRS); - } - - @SuppressWarnings("deprecation") - private void deleteLocalFiles(String subdir) throws IOException { - String[] localDirs = getLocalDirs(); - for (int i = 0; i < localDirs.length; i++) { - FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir)); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java deleted file mode 100644 index e1d83ad..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.engine.common.task.local.output; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.tez.engine.common.InputAttemptIdentifier; - -/** - * Manipulate the working area for the transient store for maps and reduces. - * - * This class is used by map and reduce tasks to identify the directories that - * they need to write to/read from for intermediate files. The callers of - * these methods are from child space and see mapreduce.cluster.local.dir as - * taskTracker/jobCache/jobId/attemptId - * This class should not be used from TaskTracker space. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public abstract class TezTaskOutput { - - protected Configuration conf; - protected String uniqueId; - - public TezTaskOutput(Configuration conf, String uniqueId) { - this.conf = conf; - this.uniqueId = uniqueId; - } - - /** - * Return the path to local map output file created earlier - * - * @return path - * @throws IOException - */ - public abstract Path getOutputFile() throws IOException; - - /** - * Create a local map output file name. - * - * @param size the size of the file - * @return path - * @throws IOException - */ - public abstract Path getOutputFileForWrite(long size) throws IOException; - - /** - * Create a local output file name. This method is meant to be used *only* if - * the size of the file is not know up front. - * - * @return path - * @throws IOException - */ - public abstract Path getOutputFileForWrite() throws IOException; - - /** - * Create a local map output file name on the same volume. - */ - public abstract Path getOutputFileForWriteInVolume(Path existing); - - /** - * Return the path to a local map output index file created earlier - * - * @return path - * @throws IOException - */ - public abstract Path getOutputIndexFile() throws IOException; - - /** - * Create a local map output index file name. - * - * @param size the size of the file - * @return path - * @throws IOException - */ - public abstract Path getOutputIndexFileForWrite(long size) throws IOException; - - /** - * Create a local map output index file name on the same volume. - */ - public abstract Path getOutputIndexFileForWriteInVolume(Path existing); - - /** - * Return a local map spill file created earlier. - * - * @param spillNumber the number - * @return path - * @throws IOException - */ - public abstract Path getSpillFile(int spillNumber) throws IOException; - - /** - * Create a local map spill file name. - * - * @param spillNumber the number - * @param size the size of the file - * @return path - * @throws IOException - */ - public abstract Path getSpillFileForWrite(int spillNumber, long size) - throws IOException; - - /** - * Return a local map spill index file created earlier - * - * @param spillNumber the number - * @return path - * @throws IOException - */ - public abstract Path getSpillIndexFile(int spillNumber) throws IOException; - - /** - * Create a local map spill index file name. - * - * @param spillNumber the number - * @param size the size of the file - * @return path - * @throws IOException - */ - public abstract Path getSpillIndexFileForWrite(int spillNumber, long size) - throws IOException; - - /** - * Return a local reduce input file created earlier - * - * @param attemptIdentifier The identifier for the source task - * @return path - * @throws IOException - */ - public abstract Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException; - - /** - * Create a local reduce input file name. - * - * @param taskIdentifier The identifier for the source task - * @param size the size of the file - * @return path - * @throws IOException - */ - public abstract Path getInputFileForWrite( - int taskIdentifier, long size) throws IOException; - - /** Removes all of the files related to a task. */ - public abstract void removeAll() throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java deleted file mode 100644 index b8f051b..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java +++ /dev/null @@ -1,246 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.engine.common.task.local.output; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.Path; -import org.apache.tez.common.Constants; -import org.apache.tez.common.TezJobConfig; -import org.apache.tez.engine.common.InputAttemptIdentifier; - -/** - * Manipulate the working area for the transient store for maps and reduces. - * - * This class is used by map and reduce tasks to identify the directories that - * they need to write to/read from for intermediate files. The callers of - * these methods are from child space and see mapreduce.cluster.local.dir as - * taskTracker/jobCache/jobId/attemptId - * This class should not be used from TaskTracker space. - */ - -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class TezTaskOutputFiles extends TezTaskOutput { - - public TezTaskOutputFiles(Configuration conf, String uniqueId) { - super(conf, uniqueId); - } - - private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class); - - private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out"; - private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN - + ".index"; - - - - // assume configured to $localdir/usercache/$user/appcache/$appId - private LocalDirAllocator lDirAlloc = - new LocalDirAllocator(TezJobConfig.LOCAL_DIRS); - - - private Path getAttemptOutputDir() { - if (LOG.isDebugEnabled()) { - LOG.debug("getAttemptOutputDir: " - + Constants.TASK_OUTPUT_DIR + "/" - + uniqueId); - } - return new Path(Constants.TASK_OUTPUT_DIR, uniqueId); - } - - /** - * Return the path to local map output file created earlier - * - * @return path - * @throws IOException - */ - public Path getOutputFile() throws IOException { - Path attemptOutput = - new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING); - return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf); - } - - /** - * Create a local map output file name. - * - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getOutputFileForWrite(long size) throws IOException { - Path attemptOutput = - new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING); - return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf); - } - - /** - * Create a local map output file name. This should *only* be used if the size - * of the file is not known. Otherwise use the equivalent which accepts a size - * parameter. - * - * @return path - * @throws IOException - */ - public Path getOutputFileForWrite() throws IOException { - Path attemptOutput = - new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING); - return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf); - } - - /** - * Create a local map output file name on the same volume. - */ - public Path getOutputFileForWriteInVolume(Path existing) { - Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR); - Path attemptOutputDir = new Path(outputDir, uniqueId); - return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING); - } - - /** - * Return the path to a local map output index file created earlier - * - * @return path - * @throws IOException - */ - public Path getOutputIndexFile() throws IOException { - Path attemptIndexOutput = - new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + - Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING); - return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf); - } - - /** - * Create a local map output index file name. - * - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getOutputIndexFileForWrite(long size) throws IOException { - Path attemptIndexOutput = - new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + - Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING); - return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(), - size, conf); - } - - /** - * Create a local map output index file name on the same volume. - */ - public Path getOutputIndexFileForWriteInVolume(Path existing) { - Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR); - Path attemptOutputDir = new Path(outputDir, uniqueId); - return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + - Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING); - } - - /** - * Return a local map spill file created earlier. - * - * @param spillNumber the number - * @return path - * @throws IOException - */ - public Path getSpillFile(int spillNumber) throws IOException { - return lDirAlloc.getLocalPathToRead( - String.format(SPILL_FILE_PATTERN, - uniqueId, spillNumber), conf); - } - - /** - * Create a local map spill file name. - * - * @param spillNumber the number - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getSpillFileForWrite(int spillNumber, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite( - String.format(String.format(SPILL_FILE_PATTERN, - uniqueId, spillNumber)), size, conf); - } - - /** - * Return a local map spill index file created earlier - * - * @param spillNumber the number - * @return path - * @throws IOException - */ - public Path getSpillIndexFile(int spillNumber) throws IOException { - return lDirAlloc.getLocalPathToRead( - String.format(SPILL_INDEX_FILE_PATTERN, - uniqueId, spillNumber), conf); - } - - /** - * Create a local map spill index file name. - * - * @param spillNumber the number - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getSpillIndexFileForWrite(int spillNumber, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite( - String.format(SPILL_INDEX_FILE_PATTERN, - uniqueId, spillNumber), size, conf); - } - - /** - * Return a local reduce input file created earlier - * - * @param attemptIdentifier an identifier for a task. The attempt information is ignored. - * @return path - * @throws IOException - */ - public Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException { - throw new UnsupportedOperationException("Incompatible with LocalRunner"); - } - - /** - * Create a local reduce input file name. - * - * @param attemptIdentifier an identifier for a task. The attempt information is ignored. - * @param size the size of the file - * @return path - * @throws IOException - */ - public Path getInputFileForWrite(int srcTaskId, - long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(String.format( - uniqueId, getAttemptOutputDir().toString(), srcTaskId), - size, conf); - } - - /** Removes all of the files related to a task. */ - public void removeAll() throws IOException { - throw new UnsupportedOperationException("Incompatible with LocalRunner"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java b/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java deleted file mode 100644 index 5071dd2..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java +++ /dev/null @@ -1,33 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.engine.hadoop.compat; - -import org.apache.hadoop.util.Progressable; - -public class NullProgressable implements Progressable { - - public NullProgressable() { - // TODO Auto-generated constructor stub - } - - @Override - public void progress() { - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java ---------------------------------------------------------------------- diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java deleted file mode 100644 index 6371787..0000000 --- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.engine.lib.input; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import org.apache.tez.common.TezUtils; -import org.apache.tez.engine.api.Event; -import org.apache.tez.engine.api.LogicalInput; -import org.apache.tez.engine.api.TezInputContext; -import org.apache.tez.engine.common.localshuffle.LocalShuffle; - -/** - * LocalMergedInput in an {@link LogicalInput} which shuffles intermediate - * sorted data, merges them and provides key/ to the consumer. - */ -public class LocalMergedInput extends ShuffledMergedInputLegacy { - - @Override - public List initialize(TezInputContext inputContext) throws IOException { - this.inputContext = inputContext; - this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload()); - - LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs); - rawIter = localShuffle.run(); - createValuesIterator(); - return Collections.emptyList(); - } - - @Override - public List close() throws IOException { - rawIter.close(); - return Collections.emptyList(); - } -}