Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 11695 invoked from network); 4 Apr 2008 15:51:23 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Apr 2008 15:51:23 -0000 Received: (qmail 41083 invoked by uid 500); 4 Apr 2008 15:51:23 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 41047 invoked by uid 500); 4 Apr 2008 15:51:23 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 41038 invoked by uid 99); 4 Apr 2008 15:51:23 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Apr 2008 08:51:23 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Apr 2008 15:50:39 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5194E1A9832; Fri, 4 Apr 2008 08:50:58 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r644741 - in /hadoop/core/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Fri, 04 Apr 2008 15:50:55 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080404155058.5194E1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Fri Apr 4 08:50:52 2008 New Revision: 644741 URL: http://svn.apache.org/viewvc?rev=644741&view=rev Log: HADOOP-2826. Deprecated FileSplit.getFile(), LineRecordReader.readLine(). Contributed by Amareshwari Sriramadasu. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=644741&r1=644740&r2=644741&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Fri Apr 4 08:50:52 2008 @@ -83,6 +83,9 @@ availability zone as the cluster. Ganglia monitoring and large instance sizes have also been added. (Chris K Wensel via tomwhite) + HADOOP-2826. Deprecated FileSplit.getFile(), LineRecordReader.readLine(). + (Amareshwari Sriramadasu via ddas) + NEW FEATURES HADOOP-1398. Add HBase in-memory block cache. (tomwhite) Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=644741&r1=644740&r2=644741&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original) +++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Fri Apr 4 08:50:52 2008 @@ -36,11 +36,11 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.TaskLog; +import org.apache.hadoop.mapred.LineRecordReader.LineReader; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.fs.FileSystem; @@ -354,14 +354,16 @@ * @param val: value of a record * @throws IOException */ - void splitKeyVal(byte[] line, Text key, Text val) throws IOException { - int pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields()); + void splitKeyVal(byte[] line, int length, Text key, Text val) + throws IOException { + int pos = UTF8ByteArrayUtils.findNthByte(line, 0, length, + (byte)this.getFieldSeparator(), this.getNumOfKeyFields()); try { if (pos == -1) { - key.set(line); + key.set(line, 0, length); val.set(""); } else { - UTF8ByteArrayUtils.splitKeyVal(line, key, val, pos); + UTF8ByteArrayUtils.splitKeyVal(line, 0, length, key, val, pos); } } catch (CharacterCodingException e) { LOG.warn(StringUtils.stringifyException(e)); @@ -377,15 +379,18 @@ } public void run() { + LineReader lineReader = null; try { Text key = new Text(); Text val = new Text(); + Text line = new Text(); + lineReader = new LineReader((InputStream)clientIn_, job_); // 3/4 Tool to Hadoop - while ((answer = UTF8ByteArrayUtils.readLine((InputStream) clientIn_)) != null) { - - splitKeyVal(answer, key, val); + while (lineReader.readLine(line) > 0) { + answer = line.getBytes(); + splitKeyVal(answer, line.getLength(), key, val); output.collect(key, val); - + line.clear(); numRecWritten_++; long now = System.currentTimeMillis(); if (now-lastStdoutReport > reporterOutDelay_) { @@ -396,6 +401,9 @@ logflush(); } } + if (lineReader != null) { + lineReader.close(); + } if (clientIn_ != null) { clientIn_.close(); clientIn_ = null; @@ -405,6 +413,9 @@ outerrThreadsThrowable = th; LOG.warn(StringUtils.stringifyException(th)); try { + if (lineReader != null) { + lineReader.close(); + } if (clientIn_ != null) { clientIn_.close(); clientIn_ = null; @@ -433,18 +444,23 @@ } public void run() { - byte[] line; + Text line = new Text(); + LineReader lineReader = null; try { - while ((line = UTF8ByteArrayUtils.readLine((InputStream) clientErr_)) != null) { - String lineStr = new String(line, "UTF-8"); - System.err.println(lineStr); + lineReader = new LineReader((InputStream)clientErr_, job_); + while (lineReader.readLine(line) > 0) { + System.err.println(line.toString()); long now = System.currentTimeMillis(); if (reporter != null && now-lastStderrReport > reporterErrDelay_) { lastStderrReport = now; reporter.progress(); } + line.clear(); } - if (clientErr_ != null) { + if (lineReader != null) { + lineReader.close(); + } + if (clientErr_ != null) { clientErr_.close(); clientErr_ = null; LOG.info("MRErrorThread done"); @@ -453,6 +469,9 @@ outerrThreadsThrowable = th; LOG.warn(StringUtils.stringifyException(th)); try { + if (lineReader != null) { + lineReader.close(); + } if (clientErr_ != null) { clientErr_.close(); clientErr_ = null; Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?rev=644741&r1=644740&r2=644741&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original) +++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Fri Apr 4 08:50:52 2008 @@ -52,7 +52,7 @@ start_ = split_.getStart(); length_ = split_.getLength(); end_ = start_ + length_; - splitName_ = split_.getFile().getName(); + splitName_ = split_.getPath().getName(); reporter_ = reporter; job_ = job; fs_ = fs; @@ -128,8 +128,8 @@ } else { recStr = record.toString(); } - String unqualSplit = split_.getFile().getName() + ":" + split_.getStart() + "+" - + split_.getLength(); + String unqualSplit = split_.getPath().getName() + ":" + + split_.getStart() + "+" + split_.getLength(); String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos + " " + unqualSplit + " Processing record=" + recStr; status += " " + splitName_; Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?rev=644741&r1=644740&r2=644741&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java (original) +++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java Fri Apr 4 08:50:52 2008 @@ -18,12 +18,10 @@ package org.apache.hadoop.streaming; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.LineRecordReader; +import org.apache.hadoop.mapred.LineRecordReader.LineReader; /** * General utils for byte array containing UTF-8 encoded strings @@ -67,12 +65,12 @@ * Find the nth occurrence of the given byte b in a UTF-8 encoded string * @param utf a byte array containing a UTF-8 encoded string * @param start starting offset - * @param end ending position + * @param length the length of byte array * @param b the byte to find * @param n the desired occurrence of the given byte * @return position that nth occurrence of the given byte if exists; otherwise -1 */ - private static int findNthByte(byte [] utf, int start, int length, byte b, int n) { + public static int findNthByte(byte [] utf, int start, int length, byte b, int n) { int pos = -1; int nextStart = start; for (int i = 0; i < n; i++) { @@ -148,16 +146,14 @@ /** * Read a utf8 encoded line from a data input stream. - * @param in data input stream - * @return a byte array containing the line + * @param lineReader LineReader to read the line from. + * @param out Text to read into + * @return number of bytes read * @throws IOException */ - public static byte [] readLine(InputStream in) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - long bytes = LineRecordReader.readLine(in, baos); - baos.close(); - if (bytes <= 0) - return null; - return baos.toByteArray(); + public static int readLine(LineReader lineReader, Text out) + throws IOException { + out.clear(); + return lineReader.readLine(out); } } Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java?rev=644741&r1=644740&r2=644741&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java (original) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java Fri Apr 4 08:50:52 2008 @@ -35,6 +35,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.LineRecordReader.LineReader; import org.apache.hadoop.util.ToolRunner; /** @@ -190,14 +192,14 @@ public void run() { try { in_ = connectInputStream(); - while (true) { - byte[] b = UTF8ByteArrayUtils.readLine(in_); - if (b == null) { - break; - } - buf_.append(new String(b, "UTF-8")); + LineReader lineReader = new LineReader((InputStream)in_, conf_); + Text line = new Text(); + while (lineReader.readLine(line) > 0) { + buf_.append(line.toString()); buf_.append('\n'); + line.clear(); } + lineReader.close(); in_.close(); } catch (IOException io) { throw new RuntimeException(io); Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java?rev=644741&r1=644740&r2=644741&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java Fri Apr 4 08:50:52 2008 @@ -63,9 +63,6 @@ this.hosts = hosts; } - /** @deprecated Call {@link #getPath()} instead. */ - public File getFile() { return new File(file.toString()); } - /** The file containing this split's data. */ public Path getPath() { return file; } Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=644741&r1=644740&r2=644741&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java Fri Apr 4 08:50:52 2008 @@ -74,7 +74,7 @@ * @param conf configuration * @throws IOException */ - LineReader(InputStream in, Configuration conf) throws IOException { + public LineReader(InputStream in, Configuration conf) throws IOException { this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); } @@ -220,43 +220,6 @@ return false; } - /** - * @deprecated - */ - public static long readLine(InputStream in, - OutputStream out) throws IOException { - long bytes = 0; - while (true) { - - int b = in.read(); - if (b == -1) { - break; - } - bytes += 1; - - byte c = (byte)b; - if (c == '\n') { - break; - } - - if (c == '\r') { - in.mark(1); - byte nextC = (byte)in.read(); - if (nextC != '\n') { - in.reset(); - } else { - bytes += 1; - } - break; - } - - if (out != null) { - out.write(c); - } - } - return bytes; - } - /** * Get the progress within the split */ Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java?rev=644741&r1=644740&r2=644741&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Fri Apr 4 08:50:52 2008 @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.mapred.LineRecordReader.LineReader; import org.apache.hadoop.util.ReflectionUtils; public class TestKeyValueTextInputFormat extends TestCase { @@ -128,48 +129,39 @@ } } - - private InputStream makeStream(String str) throws IOException { - Text text = new Text(str); - return new ByteArrayInputStream(text.getBytes(), 0, text.getLength()); + private LineReader makeStream(String str) throws IOException { + return new LineRecordReader.LineReader(new ByteArrayInputStream + (str.getBytes("UTF-8")), + defaultConf); } public void testUTF8() throws Exception { - InputStream in = makeStream("abcd\u20acbdcd\u20ac"); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - LineRecordReader.readLine(in, out); + LineReader in = makeStream("abcd\u20acbdcd\u20ac"); Text line = new Text(); - line.set(out.toByteArray()); + in.readLine(line); assertEquals("readLine changed utf8 characters", "abcd\u20acbdcd\u20ac", line.toString()); in = makeStream("abc\u200axyz"); - out.reset(); - LineRecordReader.readLine(in, out); - line.set(out.toByteArray()); + in.readLine(line); assertEquals("split on fake newline", "abc\u200axyz", line.toString()); } public void testNewLines() throws Exception { - InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee"); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - LineRecordReader.readLine(in, out); - assertEquals("line1 length", 1, out.size()); - out.reset(); - LineRecordReader.readLine(in, out); - assertEquals("line2 length", 2, out.size()); - out.reset(); - LineRecordReader.readLine(in, out); - assertEquals("line3 length", 0, out.size()); - out.reset(); - LineRecordReader.readLine(in, out); - assertEquals("line4 length", 3, out.size()); - out.reset(); - LineRecordReader.readLine(in, out); - assertEquals("line5 length", 4, out.size()); - out.reset(); - LineRecordReader.readLine(in, out); - assertEquals("line5 length", 5, out.size()); - assertEquals("end of file", 0, LineRecordReader.readLine(in, out)); + LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee"); + Text out = new Text(); + in.readLine(out); + assertEquals("line1 length", 1, out.getLength()); + in.readLine(out); + assertEquals("line2 length", 2, out.getLength()); + in.readLine(out); + assertEquals("line3 length", 0, out.getLength()); + in.readLine(out); + assertEquals("line4 length", 3, out.getLength()); + in.readLine(out); + assertEquals("line5 length", 4, out.getLength()); + in.readLine(out); + assertEquals("line5 length", 5, out.getLength()); + assertEquals("end of file", 0, in.readLine(out)); } private static void writeFile(FileSystem fs, Path name,