Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 15974 invoked from network); 16 Jul 2007 18:31:57 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 16 Jul 2007 18:31:57 -0000 Received: (qmail 79439 invoked by uid 500); 16 Jul 2007 18:05:18 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 79415 invoked by uid 500); 16 Jul 2007 18:05:18 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 79402 invoked by uid 99); 16 Jul 2007 18:05:18 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Jul 2007 11:05:18 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Jul 2007 11:05:15 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 3C7321A981A; Mon, 16 Jul 2007 11:04:55 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r556684 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskLog.java Date: Mon, 16 Jul 2007 18:04:55 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070716180455.3C7321A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Mon Jul 16 11:04:54 2007 New Revision: 556684 URL: http://svn.apache.org/viewvc?view=rev&rev=556684 Log: HADOOP-1524. Permit user task logs to appear as they are created. Contributed by Michael Bieniosek. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=556684&r1=556683&r2=556684 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Jul 16 11:04:54 2007 @@ -351,6 +351,9 @@ 109. HADOOP-1597. Add status reports and post-upgrade options to HDFS distributed upgrade. (Konstantin Shvachko via cutting) +110. HADOOP-1524. Permit user task logs to appear as they're + created. (Michael Bieniosek via cutting) + Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?view=diff&rev=556684&r1=556683&r2=556684 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Mon Jul 16 11:04:54 2007 @@ -238,12 +238,13 @@ throws IOException { currentSplit = getLogSplit(split); LOG.debug("About to create the split: " + currentSplit); + // Record the 'split' in the index + writeIndexRecord(); return new BufferedOutputStream(new FileOutputStream(currentSplit)); } private synchronized void writeIndexRecord() throws IOException { - String indexRecord = currentSplit + "|" + splitOffset + "|" + - splitLength + "\n"; + String indexRecord = currentSplit + "|" + splitOffset + "\n"; splitIndex.write(indexRecord.getBytes()); splitIndex.flush(); } @@ -253,9 +254,6 @@ LOG.debug("About to rotate-out the split: " + noSplits); out.close(); - // Record the 'split' in the index - writeIndexRecord(); - // Re-initialize the state splitOffset += splitLength; splitLength = 0; @@ -312,12 +310,10 @@ private static class IndexRecord { String splitName; long splitOffset; - long splitLength; - IndexRecord(String splitName, long splitOffset, long splitLength) { + IndexRecord(String splitName, long splitOffset) { this.splitName = splitName; this.splitOffset = splitOffset; - this.splitLength = splitLength; } } @@ -331,26 +327,26 @@ String line; while ((line = splitIndex.readLine()) != null) { String[] fields = line.split("\\|"); - if (fields.length != 3) { + if (fields.length != 2) { throw new IOException("Malformed split-index with " + fields.length + " fields"); } IndexRecord record = new IndexRecord( fields[0], - Long.valueOf(fields[1]).longValue(), - Long.valueOf(fields[2]).longValue() + Long.valueOf(fields[1]).longValue() ); - LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset + - ", " + record.splitLength + ">"); + LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset + ">"); // Save records.add(record); - logFileSize += record.splitLength; } indexRecords = new IndexRecord[records.size()]; indexRecords = records.toArray(indexRecords); + IndexRecord lastRecord = indexRecords[records.size() - 1]; + logFileSize = lastRecord.splitOffset + + new File(lastRecord.splitName).length(); initialized = true; LOG.debug("Log size: " + logFileSize); } @@ -384,34 +380,28 @@ // Get all splits Vector streams = new Vector(); - int totalLogSize = 0; for (int i=0; i < indexRecords.length; ++i) { InputStream stream = getLogSplit(i); if (stream != null) { streams.add(stream); - totalLogSize += indexRecords[i].splitLength; LOG.debug("Added split: " + i); } } - LOG.debug("Total log-size on disk: " + totalLogSize + - "; actual log-size: " + logFileSize); + LOG.debug("Total log-size on disk: " + logFileSize); // Copy log data into buffer - byte[] b = new byte[totalLogSize]; + byte[] b = new byte[(int) logFileSize]; SequenceInputStream in = new SequenceInputStream(streams.elements()); try { - int bytesRead = 0, totalBytesRead = 0; - int off = 0, len = totalLogSize; - LOG.debug("Attempting to read " + len + " bytes from logs"); - while ((bytesRead = in.read(b, off, len)) > 0) { + int bytesRead = 0; + int off = 0; + LOG.debug("Attempting to read " + logFileSize + " bytes from logs"); + while ((bytesRead = in.read(b, off, (int) logFileSize - off)) > 0) { LOG.debug("Got " + bytesRead + " bytes"); off += bytesRead; - len -= bytesRead; - - totalBytesRead += bytesRead; } - if (totalBytesRead != totalLogSize) { + if (off != logFileSize) { LOG.debug("Didn't not read all requisite data in logs!"); } } finally {