Return-Path: Delivered-To: apmail-incubator-pig-commits-archive@locus.apache.org Received: (qmail 93486 invoked from network); 24 Jan 2008 22:03:23 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 24 Jan 2008 22:03:23 -0000 Received: (qmail 75747 invoked by uid 500); 24 Jan 2008 22:02:59 -0000 Delivered-To: apmail-incubator-pig-commits-archive@incubator.apache.org Received: (qmail 75703 invoked by uid 500); 24 Jan 2008 22:02:59 -0000 Mailing-List: contact pig-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@incubator.apache.org Delivered-To: mailing list pig-commits@incubator.apache.org Received: (qmail 75664 invoked by uid 99); 24 Jan 2008 22:02:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Jan 2008 14:02:59 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Jan 2008 22:02:30 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B86EC1A9832; Thu, 24 Jan 2008 14:02:21 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r615045 - in /incubator/pig/trunk: CHANGES.txt src/org/apache/pig/builtin/PigStorage.java src/org/apache/pig/builtin/TextLoader.java src/org/apache/pig/impl/io/BufferedPositionedInputStream.java test/org/apache/pig/test/TestBuiltin.java Date: Thu, 24 Jan 2008 22:02:20 -0000 To: pig-commits@incubator.apache.org From: olga@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080124220221.B86EC1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: olga Date: Thu Jan 24 14:02:19 2008 New Revision: 615045 URL: http://svn.apache.org/viewvc?rev=615045&view=rev Log: Fix for PIG-63 Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=615045&r1=615044&r2=615045&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Thu Jan 24 14:02:19 2008 @@ -66,3 +66,5 @@ comparator function instead of Class.forName. (gates) PIG-56: Made DataBag implement Iterable. (groves via gates) + + PIG-63: Fix for non-ascii UTF-8 data (breed@ and olgan@) Modified: incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=615045&r1=615044&r2=615045&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original) +++ incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Jan 24 14:02:19 2008 @@ -17,18 +17,12 @@ */ package org.apache.pig.builtin; -import java.io.BufferedReader; -import java.io.DataInputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.io.OutputStream; -import java.text.SimpleDateFormat; -import java.util.Iterator; +import java.nio.charset.Charset; import org.apache.pig.LoadFunc; import org.apache.pig.StoreFunc; -import org.apache.pig.data.TimestampedTuple; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.BufferedPositionedInputStream; @@ -40,11 +34,10 @@ */ public class PigStorage implements LoadFunc, StoreFunc { protected BufferedPositionedInputStream in = null; - private DataInputStream inData = null; - long end = Long.MAX_VALUE; - private String recordDel = "\n"; + private byte recordDel = (byte)'\n'; private String fieldDel = "\t"; + final private static Charset utf8 = Charset.forName("UTF8"); public PigStorage() { } @@ -66,7 +59,7 @@ return null; } String line; - if((line = inData.readLine()) != null) { + if((line = in.readLine(utf8, recordDel)) != null) { return new Tuple(line, fieldDel); } return null; @@ -74,11 +67,10 @@ public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException { this.in = in; - inData = new DataInputStream(in); this.end = end; // Since we are not block aligned we throw away the first - // record and cound on a different instance to read it + // record and could on a different instance to read it if (offset != 0) { getNext(); } @@ -90,7 +82,7 @@ } public void putNext(Tuple f) throws IOException { - os.write((f.toDelimitedString(this.fieldDel) + this.recordDel).getBytes()); + os.write((f.toDelimitedString(this.fieldDel) + (char)this.recordDel).getBytes(utf8)); } public void finish() throws IOException { Modified: incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java?rev=615045&r1=615044&r2=615045&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java (original) +++ incubator/pig/trunk/src/org/apache/pig/builtin/TextLoader.java Thu Jan 24 14:02:19 2008 @@ -17,8 +17,8 @@ */ package org.apache.pig.builtin; -import java.io.DataInputStream; import java.io.IOException; +import java.nio.charset.Charset; import org.apache.pig.LoadFunc; import org.apache.pig.data.DataAtom; @@ -32,16 +32,14 @@ */ public class TextLoader implements LoadFunc{ BufferedPositionedInputStream in; - private DataInputStream inData = null; - + final private static Charset utf8 = Charset.forName("UTF8"); long end; public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException { this.in = in; - inData = new DataInputStream(in); this.end = end; // Since we are not block aligned we throw away the first - // record and cound on a different instance to read it + // record and could on a different instance to read it if (offset != 0) getNext(); } @@ -50,7 +48,7 @@ if (in == null || in.getPosition() > end) return null; String line; - if ((line = inData.readLine()) != null) { + if ((line = in.readLine(utf8, (byte)'\n')) != null) { Tuple t = new Tuple(1); t.setField(0, new DataAtom(line)); return t; Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java?rev=615045&r1=615044&r2=615045&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java Thu Jan 24 14:02:19 2008 @@ -20,6 +20,12 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; import org.apache.tools.bzip2r.CBZip2InputStream; @@ -66,5 +72,92 @@ return pos; } + /* + * Preallocated array for readline buffering + */ + private byte barray[] = new byte[1024]; + + /* + * Preallocated ByteBuffer for readline buffering + */ + private ByteBuffer bbuff = ByteBuffer.wrap(barray); + + /* + * Preallocated char array for decoding bytes + */ + private char carray[] = new char[1024]; + + /* + * Preallocated CharBuffer for decoding bytes + */ + private CharBuffer cbuff = CharBuffer.wrap(carray); + public String readLine(Charset charset, byte delimiter) throws IOException { + CharsetDecoder decoder = charset.newDecoder(); + decoder.onMalformedInput(CodingErrorAction.REPLACE); + decoder.onUnmappableCharacter(CodingErrorAction.REPLACE); + int delim = delimiter&0xff; + int rc; + int offset = 0; + StringBuilder sb = null; + CoderResult res; + while ((rc = read())!=-1) { + if (rc == delim) { + break; + } + barray[offset++] = (byte)rc; + if (barray.length == offset) { + bbuff.position(0); + bbuff.limit(barray.length); + cbuff.position(0); + cbuff.limit(carray.length); + res = decoder.decode(bbuff, cbuff, false); + if (res.isError()) { + throw new IOException("Decoding error: " + res.toString()); + } + offset = bbuff.remaining(); + switch (offset) { + default: + System.arraycopy(barray, bbuff.position(), barray, 0, bbuff + .remaining()); + break; + case 2: + barray[1] = barray[barray.length - 1]; + barray[0] = barray[barray.length - 2]; + break; + case 1: + barray[0] = barray[barray.length - 1]; + break; + case 0: + } + if (sb == null) { + sb = new StringBuilder(cbuff.position()); + } + sb.append(carray, 0, cbuff.position()); + } + } + if (sb == null) { + if (rc == -1 && offset == 0) { + // We are at EOF with nothing read + return null; + } + sb = new StringBuilder(); + } + bbuff.position(0); + bbuff.limit(offset); + cbuff.position(0); + cbuff.limit(carray.length); + res = decoder.decode(bbuff, cbuff, true); + if (res.isError()) { + System.out.println("Error"); + } + sb.append(carray, 0, cbuff.position()); + cbuff.position(0); + res = decoder.flush(cbuff); + if (res.isError()) { + System.out.println("Error"); + } + sb.append(carray, 0, cbuff.position()); + return sb.toString(); + } } Modified: incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=615045&r1=615044&r2=615045&view=diff ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original) +++ incubator/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Thu Jan 24 14:02:19 2008 @@ -18,9 +18,6 @@ package org.apache.pig.test; import java.io.File; -import java.io.FileOutputStream; -import java.io.InputStream; -import java.io.OutputStream; import java.io.PrintWriter; import java.util.Iterator; @@ -39,11 +36,8 @@ import org.apache.pig.data.DataBag; import org.apache.pig.data.DataMap; import org.apache.pig.data.Tuple; -import org.apache.pig.PigServer.ExecType; import org.apache.pig.impl.builtin.ShellBagEvalFunc; -import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.BufferedPositionedInputStream; -import org.apache.pig.impl.PigContext; public class TestBuiltin extends TestCase { @@ -312,6 +306,28 @@ p1.bindTo(null, new BufferedPositionedInputStream(ffis1), 0, input1.getBytes().length); Tuple f1 = p1.getNext(); assertTrue(f1.arity() == arity1); + + LoadFunc p15 = new PigStorage(); + StringBuilder sb = new StringBuilder(); + int LOOP_COUNT = 1024; + for (int i = 0; i < LOOP_COUNT; i++) { + for (int j = 0; j < LOOP_COUNT; j++) { + sb.append(i + "\t" + i + "\t" + j % 2 + "\n"); + } + } + FakeFSInputStream ffis15 = new FakeFSInputStream(sb.toString() + .getBytes()); + p15.bindTo(null, new BufferedPositionedInputStream(ffis15), 0, input1 + .getBytes().length); + int count = 0; + while (true) { + Tuple f15 = p15.getNext(); + if (f15 == null) + break; + count++; + assertEquals(3, f15.arity()); + } + assertEquals(LOOP_COUNT * LOOP_COUNT, count); String input2 = ":this:has:a:leading:colon\n"; int arity2 = 6;