Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7B80C17EFE for ; Fri, 10 Oct 2014 04:33:09 +0000 (UTC) Received: (qmail 25752 invoked by uid 500); 10 Oct 2014 04:33:08 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 25673 invoked by uid 500); 10 Oct 2014 04:33:08 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 25261 invoked by uid 99); 10 Oct 2014 04:33:08 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Oct 2014 04:33:08 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 6FC7719252; Fri, 10 Oct 2014 04:33:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jihoonson@apache.org To: commits@tajo.apache.org Date: Fri, 10 Oct 2014 04:33:37 -0000 Message-Id: <69e8feab50d24d9ea27c9afa4733011a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [31/43] git commit: TAJO-1097: IllegalArgumentException: RawFileScanner. (Mai Hai Thanh via jinho) TAJO-1097: IllegalArgumentException: RawFileScanner. (Mai Hai Thanh via jinho) Closes #183 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0b2ea889 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0b2ea889 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0b2ea889 Branch: refs/heads/index_support Commit: 0b2ea889b06166539aabdc0fdd9029008e5f1f7a Parents: 0dfa397 Author: jhkim Authored: Wed Oct 8 17:00:20 2014 +0900 Committer: jhkim Committed: Wed Oct 8 17:00:20 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/storage/RawFile.java | 12 ++- .../org/apache/tajo/storage/TestStorages.java | 96 ++++++++++++++++++-- 3 files changed, 99 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0b2ea889/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 7aa7a0c..6122682 100644 --- a/CHANGES +++ b/CHANGES @@ -163,6 +163,8 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-1097: IllegalArgumentException: RawFileScanner. (Mai Hai Thanh via jinho) + TAJO-1103: Insert clause of partitioned table loses some FetchImpls. (jinho) TAJO-1101: Broadcast join with a zero-length file table returns wrong result data. http://git-wip-us.apache.org/repos/asf/tajo/blob/0b2ea889/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java index edcf686..c8ac3a2 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java @@ -108,7 +108,9 @@ public class RawFile { headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize // initial read - channel.position(fragment.getStartKey()); + if (fragment.getStartKey() > 0) { + channel.position(fragment.getStartKey()); + } numBytesRead = channel.read(buffer); buffer.flip(); @@ -151,7 +153,11 @@ public class RawFile { long realRemaining = fragment.getEndKey() - numBytesRead; numBytesRead += bytesRead; if (realRemaining < bufferSize) { - buffer.limit(currentDataSize + (int) realRemaining); + int newLimit = currentDataSize + (int) realRemaining; + if(newLimit > bufferSize) { + newLimit = bufferSize; + } + buffer.limit(newLimit); } return true; } @@ -382,7 +388,7 @@ public class RawFile { // clear the buffer buffer.clear(); // reload initial buffer - channel.position(0); + channel.position(fragment.getStartKey()); numBytesRead = channel.read(buffer); buffer.flip(); eof = false; http://git-wip-us.apache.org/repos/asf/tajo/blob/0b2ea889/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java index 3bea740..5d1b652 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,9 +51,9 @@ import org.junit.runners.Parameterized; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.List; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @RunWith(Parameterized.class) @@ -97,13 +98,15 @@ public class TestStorages { private StoreType storeType; private boolean splitable; private boolean statsable; + private boolean seekable; private Path testDir; private FileSystem fs; - public TestStorages(StoreType type, boolean splitable, boolean statsable) throws IOException { + public TestStorages(StoreType type, boolean splitable, boolean statsable, boolean seekable) throws IOException { this.storeType = type; this.splitable = splitable; this.statsable = statsable; + this.seekable = seekable; conf = new TajoConf(); @@ -118,12 +121,12 @@ public class TestStorages { @Parameterized.Parameters public static Collection generateParameters() { return Arrays.asList(new Object[][] { - {StoreType.CSV, true, true}, - {StoreType.RAW, false, false}, - {StoreType.RCFILE, true, true}, - {StoreType.PARQUET, false, false}, - {StoreType.SEQUENCEFILE, true, true}, - {StoreType.AVRO, false, false}, + {StoreType.CSV, true, true, true}, + {StoreType.RAW, false, false, true}, + {StoreType.RCFILE, true, true, false}, + {StoreType.PARQUET, false, false, false}, + {StoreType.SEQUENCEFILE, true, true, false}, + {StoreType.AVRO, false, false, false}, }); } @@ -773,4 +776,81 @@ public class TestStorages { } } + @Test + public void testSeekableScanner() throws IOException { + if (!seekable) { + return; + } + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("comment", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + Path tablePath = new Path(testDir, "Seekable.data"); + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, + tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 100000; + VTuple vTuple; + + List offsets = Lists.newArrayList(); + offsets.add(0L); + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(3); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + vTuple.put(2, DatumFactory.createText("test")); + appender.addTuple(vTuple); + + // find a seek position + if (i % (tupleNum / 3) == 0) { + offsets.add(appender.getOffset()); + } + } + + // end of file + if (!offsets.contains(appender.getOffset())) { + offsets.add(appender.getOffset()); + } + + appender.close(); + if (statsable) { + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + } + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(status.getLen(), appender.getOffset()); + + Scanner scanner; + int tupleCnt = 0; + long prevOffset = 0; + long readBytes = 0; + long readRows = 0; + for (long offset : offsets) { + scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, + new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema); + scanner.init(); + + while (scanner.next() != null) { + tupleCnt++; + } + + scanner.close(); + if (statsable) { + readBytes += scanner.getInputStats().getNumBytes(); + readRows += scanner.getInputStats().getNumRows(); + } + prevOffset = offset; + } + + assertEquals(tupleNum, tupleCnt); + if (statsable) { + assertEquals(appender.getStats().getNumBytes().longValue(), readBytes); + assertEquals(appender.getStats().getNumRows().longValue(), readRows); + } + } }