Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 166991013C for ; Tue, 28 Jan 2014 12:37:09 +0000 (UTC) Received: (qmail 84792 invoked by uid 500); 28 Jan 2014 12:37:07 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 84697 invoked by uid 500); 28 Jan 2014 12:37:05 -0000 Mailing-List: contact commits-help@tajo.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.incubator.apache.org Delivered-To: mailing list commits@tajo.incubator.apache.org Received: (qmail 84677 invoked by uid 99); 28 Jan 2014 12:37:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jan 2014 12:37:05 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 28 Jan 2014 12:36:04 +0000 Received: (qmail 82921 invoked by uid 99); 28 Jan 2014 12:35:40 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jan 2014 12:35:40 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id EDC47909948; Tue, 28 Jan 2014 12:35:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jhkim@apache.org To: commits@tajo.incubator.apache.org Date: Tue, 28 Jan 2014 12:35:55 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/18] git commit: TAJO-520: Move tajo-core-storage to tajo-storage. (jinho) X-Virus-Checked: Checked by ClamAV on apache.org TAJO-520: Move tajo-core-storage to tajo-storage. (jinho) Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/bbf9b7bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/bbf9b7bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/bbf9b7bf Branch: refs/heads/master Commit: bbf9b7bf8b60a930b4e44754c732f9629762d1bf Parents: 6aa96fa Author: jinossy Authored: Tue Jan 28 21:34:41 2014 +0900 Committer: jinossy Committed: Tue Jan 28 21:34:41 2014 +0900 ---------------------------------------------------------------------- CHANGES.txt | 2 + pom.xml | 1 + tajo-client/pom.xml | 2 +- tajo-core/pom.xml | 2 - tajo-core/tajo-core-backend/pom.xml | 2 +- tajo-core/tajo-core-pullserver/pom.xml | 2 +- tajo-core/tajo-core-storage/pom.xml | 301 --- .../tajo/storage/AbstractStorageManager.java | 690 ------- .../java/org/apache/tajo/storage/Appender.java | 39 - .../storage/BinarySerializerDeserializer.java | 257 --- .../java/org/apache/tajo/storage/CSVFile.java | 531 ----- .../tajo/storage/CompressedSplitLineReader.java | 182 -- .../org/apache/tajo/storage/DataLocation.java | 45 - .../org/apache/tajo/storage/FileAppender.java | 61 - .../org/apache/tajo/storage/FileScanner.java | 93 - .../org/apache/tajo/storage/FrameTuple.java | 231 --- .../java/org/apache/tajo/storage/LazyTuple.java | 291 --- .../org/apache/tajo/storage/LineReader.java | 559 ------ .../org/apache/tajo/storage/MergeScanner.java | 154 -- .../tajo/storage/NumericPathComparator.java | 34 - .../java/org/apache/tajo/storage/RawFile.java | 532 ----- .../java/org/apache/tajo/storage/RowFile.java | 506 ----- .../org/apache/tajo/storage/RowStoreUtil.java | 206 -- .../java/org/apache/tajo/storage/Scanner.java | 94 - .../apache/tajo/storage/SeekableScanner.java | 28 - .../tajo/storage/SerializerDeserializer.java | 34 - .../apache/tajo/storage/SplitLineReader.java | 39 - .../java/org/apache/tajo/storage/Storage.java | 45 - .../org/apache/tajo/storage/StorageManager.java | 67 - .../tajo/storage/StorageManagerFactory.java | 98 - .../org/apache/tajo/storage/StorageUtil.java | 83 - .../apache/tajo/storage/TableStatistics.java | 117 -- .../storage/TextSerializerDeserializer.java | 209 -- .../java/org/apache/tajo/storage/Tuple.java | 82 - .../apache/tajo/storage/TupleComparator.java | 159 -- .../org/apache/tajo/storage/TupleRange.java | 103 - .../java/org/apache/tajo/storage/VTuple.java | 226 --- .../storage/annotation/ForSplitableStore.java | 29 - .../apache/tajo/storage/compress/CodecPool.java | 185 -- .../AlreadyExistsStorageException.java | 39 - .../exception/UnknownCodecException.java | 32 - .../exception/UnknownDataTypeException.java | 32 - .../exception/UnsupportedFileTypeException.java | 36 - .../tajo/storage/fragment/FileFragment.java | 219 --- .../apache/tajo/storage/fragment/Fragment.java | 31 - .../storage/fragment/FragmentConvertor.java | 123 -- .../apache/tajo/storage/index/IndexMethod.java | 32 - .../apache/tajo/storage/index/IndexReader.java | 35 - .../apache/tajo/storage/index/IndexWriter.java | 33 - .../tajo/storage/index/OrderIndexReader.java | 45 - .../apache/tajo/storage/index/bst/BSTIndex.java | 623 ------ .../storage/rcfile/BytesRefArrayWritable.java | 261 --- .../tajo/storage/rcfile/BytesRefWritable.java | 248 --- .../storage/rcfile/ColumnProjectionUtils.java | 117 -- .../rcfile/LazyDecompressionCallback.java | 32 - .../rcfile/NonSyncByteArrayInputStream.java | 113 -- .../rcfile/NonSyncByteArrayOutputStream.java | 144 -- .../storage/rcfile/NonSyncDataInputBuffer.java | 507 ----- .../storage/rcfile/NonSyncDataOutputBuffer.java | 91 - .../org/apache/tajo/storage/rcfile/RCFile.java | 1739 ----------------- .../SchemaAwareCompressionInputStream.java | 43 - .../SchemaAwareCompressionOutputStream.java | 44 - .../tajo/storage/trevni/TrevniAppender.java | 201 -- .../tajo/storage/trevni/TrevniScanner.java | 193 -- .../apache/tajo/storage/v2/CSVFileScanner.java | 386 ---- .../apache/tajo/storage/v2/DiskDeviceInfo.java | 62 - .../tajo/storage/v2/DiskFileScanScheduler.java | 205 -- .../org/apache/tajo/storage/v2/DiskInfo.java | 75 - .../apache/tajo/storage/v2/DiskMountInfo.java | 101 - .../org/apache/tajo/storage/v2/DiskUtil.java | 199 -- .../apache/tajo/storage/v2/FileScanRunner.java | 70 - .../apache/tajo/storage/v2/FileScannerV2.java | 203 -- .../java/org/apache/tajo/storage/v2/RCFile.java | 1823 ------------------ .../apache/tajo/storage/v2/RCFileScanner.java | 297 --- .../apache/tajo/storage/v2/ScanScheduler.java | 189 -- .../tajo/storage/v2/ScheduledInputStream.java | 513 ----- .../tajo/storage/v2/StorageManagerV2.java | 140 -- .../src/main/proto/IndexProtos.proto | 29 - .../src/main/resources/storage-default.xml | 149 -- .../tajo/storage/TestCompressionStorages.java | 233 --- .../org/apache/tajo/storage/TestFrameTuple.java | 84 - .../org/apache/tajo/storage/TestLazyTuple.java | 258 --- .../apache/tajo/storage/TestMergeScanner.java | 179 -- .../apache/tajo/storage/TestStorageManager.java | 93 - .../org/apache/tajo/storage/TestStorages.java | 375 ---- .../tajo/storage/TestTupleComparator.java | 77 - .../org/apache/tajo/storage/TestVTuple.java | 160 -- .../apache/tajo/storage/index/TestBSTIndex.java | 948 --------- .../index/TestSingleCSVFileBSTIndex.java | 248 --- .../tajo/storage/v2/TestCSVCompression.java | 213 -- .../apache/tajo/storage/v2/TestCSVScanner.java | 168 -- .../apache/tajo/storage/v2/TestStorages.java | 242 --- .../src/test/resources/storage-default.xml | 149 -- tajo-dist/pom.xml | 6 + tajo-jdbc/pom.xml | 2 +- tajo-project/pom.xml | 2 +- tajo-storage/pom.xml | 383 ++++ .../tajo/storage/AbstractStorageManager.java | 690 +++++++ .../java/org/apache/tajo/storage/Appender.java | 39 + .../storage/BinarySerializerDeserializer.java | 257 +++ .../java/org/apache/tajo/storage/CSVFile.java | 531 +++++ .../tajo/storage/CompressedSplitLineReader.java | 182 ++ .../org/apache/tajo/storage/DataLocation.java | 45 + .../org/apache/tajo/storage/FileAppender.java | 61 + .../org/apache/tajo/storage/FileScanner.java | 93 + .../org/apache/tajo/storage/FrameTuple.java | 231 +++ .../java/org/apache/tajo/storage/LazyTuple.java | 291 +++ .../org/apache/tajo/storage/LineReader.java | 559 ++++++ .../org/apache/tajo/storage/MergeScanner.java | 154 ++ .../tajo/storage/NumericPathComparator.java | 34 + .../java/org/apache/tajo/storage/RawFile.java | 532 +++++ .../java/org/apache/tajo/storage/RowFile.java | 506 +++++ .../org/apache/tajo/storage/RowStoreUtil.java | 206 ++ .../java/org/apache/tajo/storage/Scanner.java | 94 + .../apache/tajo/storage/SeekableScanner.java | 28 + .../tajo/storage/SerializerDeserializer.java | 34 + .../apache/tajo/storage/SplitLineReader.java | 39 + .../java/org/apache/tajo/storage/Storage.java | 45 + .../org/apache/tajo/storage/StorageManager.java | 67 + .../tajo/storage/StorageManagerFactory.java | 98 + .../org/apache/tajo/storage/StorageUtil.java | 83 + .../apache/tajo/storage/TableStatistics.java | 117 ++ .../storage/TextSerializerDeserializer.java | 209 ++ .../java/org/apache/tajo/storage/Tuple.java | 82 + .../apache/tajo/storage/TupleComparator.java | 159 ++ .../org/apache/tajo/storage/TupleRange.java | 103 + .../java/org/apache/tajo/storage/VTuple.java | 226 +++ .../storage/annotation/ForSplitableStore.java | 29 + .../apache/tajo/storage/compress/CodecPool.java | 185 ++ .../AlreadyExistsStorageException.java | 39 + .../exception/UnknownCodecException.java | 32 + .../exception/UnknownDataTypeException.java | 32 + .../exception/UnsupportedFileTypeException.java | 36 + .../tajo/storage/fragment/FileFragment.java | 219 +++ .../apache/tajo/storage/fragment/Fragment.java | 31 + .../storage/fragment/FragmentConvertor.java | 123 ++ .../apache/tajo/storage/index/IndexMethod.java | 32 + .../apache/tajo/storage/index/IndexReader.java | 35 + .../apache/tajo/storage/index/IndexWriter.java | 33 + .../tajo/storage/index/OrderIndexReader.java | 45 + .../apache/tajo/storage/index/bst/BSTIndex.java | 623 ++++++ .../storage/rcfile/BytesRefArrayWritable.java | 261 +++ .../tajo/storage/rcfile/BytesRefWritable.java | 248 +++ .../storage/rcfile/ColumnProjectionUtils.java | 117 ++ .../rcfile/LazyDecompressionCallback.java | 32 + .../rcfile/NonSyncByteArrayInputStream.java | 113 ++ .../rcfile/NonSyncByteArrayOutputStream.java | 144 ++ .../storage/rcfile/NonSyncDataInputBuffer.java | 507 +++++ .../storage/rcfile/NonSyncDataOutputBuffer.java | 91 + .../org/apache/tajo/storage/rcfile/RCFile.java | 1739 +++++++++++++++++ .../SchemaAwareCompressionInputStream.java | 43 + .../SchemaAwareCompressionOutputStream.java | 44 + .../tajo/storage/trevni/TrevniAppender.java | 201 ++ .../tajo/storage/trevni/TrevniScanner.java | 193 ++ .../apache/tajo/storage/v2/CSVFileScanner.java | 386 ++++ .../apache/tajo/storage/v2/DiskDeviceInfo.java | 62 + .../tajo/storage/v2/DiskFileScanScheduler.java | 205 ++ .../org/apache/tajo/storage/v2/DiskInfo.java | 75 + .../apache/tajo/storage/v2/DiskMountInfo.java | 101 + .../org/apache/tajo/storage/v2/DiskUtil.java | 199 ++ .../apache/tajo/storage/v2/FileScanRunner.java | 70 + .../apache/tajo/storage/v2/FileScannerV2.java | 203 ++ .../java/org/apache/tajo/storage/v2/RCFile.java | 1823 ++++++++++++++++++ .../apache/tajo/storage/v2/RCFileScanner.java | 297 +++ .../apache/tajo/storage/v2/ScanScheduler.java | 189 ++ .../tajo/storage/v2/ScheduledInputStream.java | 513 +++++ .../tajo/storage/v2/StorageManagerV2.java | 140 ++ tajo-storage/src/main/proto/IndexProtos.proto | 29 + .../src/main/resources/storage-default.xml | 149 ++ .../tajo/storage/TestCompressionStorages.java | 233 +++ .../org/apache/tajo/storage/TestFrameTuple.java | 84 + .../org/apache/tajo/storage/TestLazyTuple.java | 258 +++ .../apache/tajo/storage/TestMergeScanner.java | 179 ++ .../apache/tajo/storage/TestStorageManager.java | 93 + .../org/apache/tajo/storage/TestStorages.java | 375 ++++ .../tajo/storage/TestTupleComparator.java | 77 + .../org/apache/tajo/storage/TestVTuple.java | 160 ++ .../apache/tajo/storage/index/TestBSTIndex.java | 948 +++++++++ .../index/TestSingleCSVFileBSTIndex.java | 248 +++ .../tajo/storage/v2/TestCSVCompression.java | 213 ++ .../apache/tajo/storage/v2/TestCSVScanner.java | 168 ++ .../apache/tajo/storage/v2/TestStorages.java | 242 +++ .../src/test/resources/storage-default.xml | 149 ++ 183 files changed, 19287 insertions(+), 19198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e3f2bff..9699782 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -386,6 +386,8 @@ Release 0.8.0 - unreleased TASKS + TAJO-520: Move tajo-core-storage to tajo-storage. (jinho) + TAJO-536: Fix warnings in tajo-core-storage. (jinho) TAJO-545: MySQLStore Documentation. (jaehwa) http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d70f0e4..569e9be 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,7 @@ tajo-client tajo-jdbc tajo-dist + tajo-storage http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-client/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-client/pom.xml b/tajo-client/pom.xml index 9b4cd5e..c6fbb27 100644 --- a/tajo-client/pom.xml +++ b/tajo-client/pom.xml @@ -201,7 +201,7 @@ org.apache.tajo - tajo-core-storage + tajo-storage org.apache.tajo http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index 09978c3..e131d0e 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -34,7 +34,6 @@ tajo-core-backend - tajo-core-storage tajo-core-pullserver @@ -161,7 +160,6 @@ run rm -rf ${project.artifactId}-${project.version} run mkdir ${project.artifactId}-${project.version} run cd ${project.artifactId}-${project.version} - run cp -r ${basedir}/${project.artifactId}-storage/target/${project.artifactId}-storage-${project.version}*.jar . run cp -r ${basedir}/${project.artifactId}-pullserver/target/${project.artifactId}-pullserver-${project.version}*.jar . run cp -r ${basedir}/${project.artifactId}-backend/target/${project.artifactId}-backend-${project.version}*.jar . run cp -r ${basedir}/${project.artifactId}-backend/target/lib . http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-backend/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml index abc217e..fce9925 100644 --- a/tajo-core/tajo-core-backend/pom.xml +++ b/tajo-core/tajo-core-backend/pom.xml @@ -213,7 +213,7 @@ org.apache.tajo - tajo-core-storage + tajo-storage org.apache.tajo http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-pullserver/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-pullserver/pom.xml b/tajo-core/tajo-core-pullserver/pom.xml index 8c6d4fe..0bdfed2 100644 --- a/tajo-core/tajo-core-pullserver/pom.xml +++ b/tajo-core/tajo-core-pullserver/pom.xml @@ -44,7 +44,7 @@ org.apache.tajo - tajo-core-storage + tajo-storage provided http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/pom.xml b/tajo-core/tajo-core-storage/pom.xml deleted file mode 100644 index dcbde44..0000000 --- a/tajo-core/tajo-core-storage/pom.xml +++ /dev/null @@ -1,301 +0,0 @@ - - - - - 4.0.0 - - tajo-project - org.apache.tajo - 0.8.0-SNAPSHOT - ../../tajo-project - - tajo-core-storage - jar - Tajo Core Storage - - - UTF-8 - UTF-8 - - - - - repository.jboss.org - https://repository.jboss.org/nexus/content/repositories/releases/ - - - false - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.6 - 1.6 - ${project.build.sourceEncoding} - - - - org.apache.maven.plugins - maven-jar-plugin - 2.4 - - - - test-jar - - - - - - org.apache.maven.plugins - maven-antrun-plugin - - - create-protobuf-generated-sources-directory - initialize - - - - - - - run - - - - - - org.codehaus.mojo - exec-maven-plugin - 1.2 - - - generate-sources - generate-sources - - protoc - - -Isrc/main/proto/ - --proto_path=../../tajo-common/src/main/proto - --proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto - --java_out=target/generated-sources/proto - src/main/proto/IndexProtos.proto - - - - exec - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 1.5 - - - add-source - generate-sources - - add-source - - - - target/generated-sources/proto - - - - - - - org.apache.maven.plugins - maven-pmd-plugin - 2.7.1 - - - - - - - - org.apache.tajo - tajo-common - - - org.apache.tajo - tajo-catalog-common - - - - org.apache.avro - trevni-core - 1.7.3 - - - org.apache.avro - trevni-avro - 1.7.3 - - - org.apache.hadoop - hadoop-core - - - - - - org.apache.hadoop - hadoop-common - - - org.apache.hadoop - hadoop-hdfs - - - commons-el - commons-el - - - tomcat - jasper-runtime - - - tomcat - jasper-compiler - - - org.mortbay.jetty - jsp-2.1-jetty - - - com.sun.jersey.jersey-test-framework - jersey-test-framework-grizzly2 - - - - - - com.google.protobuf - protobuf-java - - - junit - junit - test - - - commons-logging - commons-logging - - - commons-logging - commons-logging-api - - - com.google.guava - guava - - - - com.google.code.gson - gson - jar - compile - - - commons-lang - commons-lang - 2.6 - - - - - - docs - - false - - - - - org.apache.maven.plugins - maven-javadoc-plugin - - - - module-javadocs - package - - jar - - - ${project.build.directory} - - - - - - - - - src - - false - - - - - org.apache.maven.plugins - maven-source-plugin - - - - tajo-java-sources - package - - jar-no-fork - - - - - - - - - - - - - org.apache.maven.plugins - maven-project-info-reports-plugin - 2.4 - - false - - - - - - - http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java deleted file mode 100644 index 91a535e..0000000 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java +++ /dev/null @@ -1,690 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - - -import com.google.common.collect.Lists; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.net.util.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.util.Bytes; -import org.apache.tajo.util.FileUtil; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; - -public abstract class AbstractStorageManager { - private final Log LOG = LogFactory.getLog(AbstractStorageManager.class); - - protected final TajoConf conf; - protected final FileSystem fs; - protected final Path tableBaseDir; - protected final boolean blocksMetadataEnabled; - - /** - * Cache of scanner handlers for each storage type. - */ - protected static final Map> SCANNER_HANDLER_CACHE - = new ConcurrentHashMap>(); - - /** - * Cache of appender handlers for each storage type. - */ - protected static final Map> APPENDER_HANDLER_CACHE - = new ConcurrentHashMap>(); - - /** - * Cache of constructors for each class. Pins the classes so they - * can't be garbage collected until ReflectionUtils can be collected. - */ - private static final Map, Constructor> CONSTRUCTOR_CACHE = - new ConcurrentHashMap, Constructor>(); - - public abstract Class getScannerClass(CatalogProtos.StoreType storeType) throws IOException; - - public abstract Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException; - - protected AbstractStorageManager(TajoConf conf) throws IOException { - this.conf = conf; - this.tableBaseDir = TajoConf.getWarehouseDir(conf); - this.fs = tableBaseDir.getFileSystem(conf); - this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); - if (!this.blocksMetadataEnabled) - LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')"); - } - - public Scanner getFileScanner(TableMeta meta, Schema schema, Path path) - throws IOException { - FileSystem fs = path.getFileSystem(conf); - FileStatus status = fs.getFileStatus(path); - FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); - return getScanner(meta, schema, fragment); - } - - public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment) throws IOException { - return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), schema); - } - - public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { - return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), target); - } - - public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException { - return getScanner(meta, schema, fragment, schema); - } - - public FileSystem getFileSystem() { - return this.fs; - } - - public Path getWarehouseDir() { - return this.tableBaseDir; - } - - public void delete(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - fs.delete(tablePath, true); - } - - public boolean exists(Path path) throws IOException { - FileSystem fileSystem = path.getFileSystem(conf); - return fileSystem.exists(path); - } - - /** - * This method deletes only data contained in the given path. - * - * @param path The path in which data are deleted. - * @throws IOException - */ - public void deleteData(Path path) throws IOException { - FileSystem fileSystem = path.getFileSystem(conf); - FileStatus[] fileLists = fileSystem.listStatus(path); - for (FileStatus status : fileLists) { - fileSystem.delete(status.getPath(), true); - } - } - - public Path getTablePath(String tableName) { - return new Path(tableBaseDir, tableName); - } - - public Appender getAppender(TableMeta meta, Schema schema, Path path) - throws IOException { - Appender appender; - - Class appenderClass; - - String handlerName = meta.getStoreType().name().toLowerCase(); - appenderClass = APPENDER_HANDLER_CACHE.get(handlerName); - if (appenderClass == null) { - appenderClass = conf.getClass( - String.format("tajo.storage.appender-handler.%s.class", - meta.getStoreType().name().toLowerCase()), null, - FileAppender.class); - APPENDER_HANDLER_CACHE.put(handlerName, appenderClass); - } - - if (appenderClass == null) { - throw new IOException("Unknown Storage Type: " + meta.getStoreType()); - } - - appender = newAppenderInstance(appenderClass, conf, meta, schema, path); - - return appender; - } - - - public TableMeta getTableMeta(Path tablePath) throws IOException { - TableMeta meta; - - FileSystem fs = tablePath.getFileSystem(conf); - Path tableMetaPath = new Path(tablePath, ".meta"); - if (!fs.exists(tableMetaPath)) { - throw new FileNotFoundException(".meta file not found in " + tablePath.toString()); - } - - FSDataInputStream tableMetaIn = fs.open(tableMetaPath); - - CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) FileUtil.loadProto(tableMetaIn, - CatalogProtos.TableProto.getDefaultInstance()); - meta = new TableMeta(tableProto); - - return meta; - } - - public FileFragment[] split(String tableName) throws IOException { - Path tablePath = new Path(tableBaseDir, tableName); - return split(tableName, tablePath, fs.getDefaultBlockSize()); - } - - public FileFragment[] split(String tableName, long fragmentSize) throws IOException { - Path tablePath = new Path(tableBaseDir, tableName); - return split(tableName, tablePath, fragmentSize); - } - - public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - List listTablets = new ArrayList(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus file : fileLists) { - tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen()); - listTablets.add(tablet); - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - public FileFragment[] split(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize()); - } - - public FileFragment[] split(String tableName, Path tablePath) throws IOException { - return split(tableName, tablePath, fs.getDefaultBlockSize()); - } - - private FileFragment[] split(String tableName, Path tablePath, long size) - throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - - long defaultBlockSize = size; - List listTablets = new ArrayList(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus file : fileLists) { - long remainFileSize = file.getLen(); - long start = 0; - if (remainFileSize > defaultBlockSize) { - while (remainFileSize > defaultBlockSize) { - tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); - listTablets.add(tablet); - start += defaultBlockSize; - remainFileSize -= defaultBlockSize; - } - listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); - } else { - listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); - } - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta, - Path tablePath, long size) - throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - - long defaultBlockSize = size; - List listTablets = new ArrayList(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus file : fileLists) { - long remainFileSize = file.getLen(); - long start = 0; - if (remainFileSize > defaultBlockSize) { - while (remainFileSize > defaultBlockSize) { - tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); - listTablets.add(tablet); - start += defaultBlockSize; - remainFileSize -= defaultBlockSize; - } - listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); - } else { - listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); - } - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - public long calculateSize(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - long totalSize = 0; - - if (fs.exists(tablePath)) { - totalSize = fs.getContentSummary(tablePath).getLength(); - } - - return totalSize; - } - - ///////////////////////////////////////////////////////////////////////////// - // FileInputFormat Area - ///////////////////////////////////////////////////////////////////////////// - - private static final PathFilter hiddenFileFilter = new PathFilter() { - public boolean accept(Path p) { - String name = p.getName(); - return !name.startsWith("_") && !name.startsWith("."); - } - }; - - /** - * Proxy PathFilter that accepts a path only if all filters given in the - * constructor do. Used by the listPaths() to apply the built-in - * hiddenFileFilter together with a user provided one (if any). - */ - private static class MultiPathFilter implements PathFilter { - private List filters; - - public MultiPathFilter(List filters) { - this.filters = filters; - } - - public boolean accept(Path path) { - for (PathFilter filter : filters) { - if (!filter.accept(path)) { - return false; - } - } - return true; - } - } - - /** - * List input directories. - * Subclasses may override to, e.g., select only files matching a regular - * expression. - * - * @return array of FileStatus objects - * @throws IOException if zero items. - */ - protected List listStatus(Path path) throws IOException { - List result = new ArrayList(); - Path[] dirs = new Path[]{path}; - if (dirs.length == 0) { - throw new IOException("No input paths specified in job"); - } - - List errors = new ArrayList(); - - // creates a MultiPathFilter with the hiddenFileFilter and the - // user provided one (if any). - List filters = new ArrayList(); - filters.add(hiddenFileFilter); - - PathFilter inputFilter = new MultiPathFilter(filters); - - for (int i = 0; i < dirs.length; ++i) { - Path p = dirs[i]; - - FileSystem fs = p.getFileSystem(conf); - FileStatus[] matches = fs.globStatus(p, inputFilter); - if (matches == null) { - errors.add(new IOException("Input path does not exist: " + p)); - } else if (matches.length == 0) { - errors.add(new IOException("Input Pattern " + p + " matches 0 files")); - } else { - for (FileStatus globStat : matches) { - if (globStat.isDirectory()) { - for (FileStatus stat : fs.listStatus(globStat.getPath(), - inputFilter)) { - result.add(stat); - } - } else { - result.add(globStat); - } - } - } - } - - if (!errors.isEmpty()) { - throw new InvalidInputException(errors); - } - LOG.info("Total input paths to process : " + result.size()); - return result; - } - - /** - * Get the lower bound on split size imposed by the format. - * - * @return the number of bytes of the minimal split for this format - */ - protected long getFormatMinSplitSize() { - return 1; - } - - /** - * Is the given filename splitable? Usually, true, but if the file is - * stream compressed, it will not be. - *

- * FileInputFormat implementations can override this and return - * false to ensure that individual input files are never split-up - * so that Mappers process entire files. - * - * - * @param filename the file name to check - * @return is this file isSplittable? - */ - protected boolean isSplittable(TableMeta meta, Schema schema, Path filename) throws IOException { - Scanner scanner = getFileScanner(meta, schema, filename); - return scanner.isSplittable(); - } - - @Deprecated - protected long computeSplitSize(long blockSize, long minSize, - long maxSize) { - return Math.max(minSize, Math.min(maxSize, blockSize)); - } - - @Deprecated - private static final double SPLIT_SLOP = 1.1; // 10% slop - - @Deprecated - protected int getBlockIndex(BlockLocation[] blkLocations, - long offset) { - for (int i = 0; i < blkLocations.length; i++) { - // is the offset inside this block? - if ((blkLocations[i].getOffset() <= offset) && - (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) { - return i; - } - } - BlockLocation last = blkLocations[blkLocations.length - 1]; - long fileLength = last.getOffset() + last.getLength() - 1; - throw new IllegalArgumentException("Offset " + offset + - " is outside of file (0.." + - fileLength + ")"); - } - - /** - * A factory that makes the split for this class. It can be overridden - * by sub-classes to make sub-types - */ - protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) { - return new FileFragment(fragmentId, file, start, length); - } - - protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation, - int[] diskIds) throws IOException { - return new FileFragment(fragmentId, file, blockLocation, diskIds); - } - - // for Non Splittable. eg, compressed gzip TextFile - protected FileFragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length, - BlockLocation[] blkLocations) throws IOException { - - Map hostsBlockMap = new HashMap(); - for (BlockLocation blockLocation : blkLocations) { - for (String host : blockLocation.getHosts()) { - if (hostsBlockMap.containsKey(host)) { - hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); - } else { - hostsBlockMap.put(host, 1); - } - } - } - - List> entries = new ArrayList>(hostsBlockMap.entrySet()); - Collections.sort(entries, new Comparator>() { - - @Override - public int compare(Map.Entry v1, Map.Entry v2) { - return v1.getValue().compareTo(v2.getValue()); - } - }); - - String[] hosts = new String[blkLocations[0].getHosts().length]; - - for (int i = 0; i < hosts.length; i++) { - Map.Entry entry = entries.get((entries.size() - 1) - i); - hosts[i] = entry.getKey(); - } - return new FileFragment(fragmentId, file, start, length, hosts); - } - - /** - * Get the maximum split size. - * - * @return the maximum number of bytes a split can include - */ - @Deprecated - public static long getMaxSplitSize() { - // TODO - to be configurable - return 536870912L; - } - - /** - * Get the minimum split size - * - * @return the minimum number of bytes that can be in a split - */ - @Deprecated - public static long getMinSplitSize() { - // TODO - to be configurable - return 67108864L; - } - - /** - * Get Disk Ids by Volume Bytes - */ - private int[] getDiskIds(VolumeId[] volumeIds) { - int[] diskIds = new int[volumeIds.length]; - for (int i = 0; i < volumeIds.length; i++) { - int diskId = -1; - if (volumeIds[i] != null && volumeIds[i].isValid()) { - String volumeIdString = volumeIds[i].toString(); - byte[] volumeIdBytes = Base64.decodeBase64(volumeIdString); - - if (volumeIdBytes.length == 4) { - diskId = Bytes.toInt(volumeIdBytes); - } else if (volumeIdBytes.length == 1) { - diskId = (int) volumeIdBytes[0]; // support hadoop-2.0.2 - } - } - diskIds[i] = diskId; - } - return diskIds; - } - - /** - * Generate the map of host and make them into Volume Ids. - * - */ - private Map> getVolumeMap(List frags) { - Map> volumeMap = new HashMap>(); - for (FileFragment frag : frags) { - String[] hosts = frag.getHosts(); - int[] diskIds = frag.getDiskIds(); - for (int i = 0; i < hosts.length; i++) { - Set volumeList = volumeMap.get(hosts[i]); - if (volumeList == null) { - volumeList = new HashSet(); - volumeMap.put(hosts[i], volumeList); - } - - if (diskIds.length > 0 && diskIds[i] > -1) { - volumeList.add(diskIds[i]); - } - } - } - - return volumeMap; - } - /** - * Generate the list of files and make them into FileSplits. - * - * @throws IOException - */ - public List getSplits(String tableName, TableMeta meta, Schema schema, Path inputPath) throws IOException { - // generate splits' - - List splits = new ArrayList(); - FileSystem fs = inputPath.getFileSystem(conf); - List files; - if (fs.isFile(inputPath)) { - files = Lists.newArrayList(fs.getFileStatus(inputPath)); - } else { - files = listStatus(inputPath); - } - for (FileStatus file : files) { - Path path = file.getPath(); - long length = file.getLen(); - if (length > 0) { - BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); - boolean splittable = isSplittable(meta, schema, path); - if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { - // supported disk volume - BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs) - .getFileBlockStorageLocations(Arrays.asList(blkLocations)); - if (splittable) { - for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { - splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation - .getVolumeIds()))); - } - } else { // Non splittable - long blockSize = blockStorageLocations[0].getLength(); - if (blockSize >= length) { - for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { - splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation - .getVolumeIds()))); - } - } else { - splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations)); - } - } - - } else { - if (splittable) { - for (BlockLocation blockLocation : blkLocations) { - splits.add(makeSplit(tableName, meta, path, blockLocation, null)); - } - } else { // Non splittable - splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations)); - } - } - } else { - //for zero length files - splits.add(makeSplit(tableName, meta, path, 0, length)); - } - } - - LOG.info("Total # of splits: " + splits.size()); - return splits; - } - - private static class InvalidInputException extends IOException { - List errors; - public InvalidInputException(List errors) { - this.errors = errors; - } - - @Override - public String getMessage(){ - StringBuffer sb = new StringBuffer(); - int messageLimit = Math.min(errors.size(), 10); - for (int i = 0; i < messageLimit ; i ++) { - sb.append(errors.get(i).getMessage()).append("\n"); - } - - if(messageLimit < errors.size()) - sb.append("skipped .....").append("\n"); - - return sb.toString(); - } - } - - private static final Class[] DEFAULT_SCANNER_PARAMS = { - Configuration.class, - Schema.class, - TableMeta.class, - FileFragment.class - }; - - private static final Class[] DEFAULT_APPENDER_PARAMS = { - Configuration.class, - Schema.class, - TableMeta.class, - Path.class - }; - - /** - * create a scanner instance. - */ - public static T newScannerInstance(Class theClass, Configuration conf, Schema schema, TableMeta meta, - Fragment fragment) { - T result; - try { - Constructor meth = (Constructor) CONSTRUCTOR_CACHE.get(theClass); - if (meth == null) { - meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS); - meth.setAccessible(true); - CONSTRUCTOR_CACHE.put(theClass, meth); - } - result = meth.newInstance(new Object[]{conf, schema, meta, fragment}); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return result; - } - - /** - * create a scanner instance. - */ - public static T newAppenderInstance(Class theClass, Configuration conf, TableMeta meta, Schema schema, - Path path) { - T result; - try { - Constructor meth = (Constructor) CONSTRUCTOR_CACHE.get(theClass); - if (meth == null) { - meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS); - meth.setAccessible(true); - CONSTRUCTOR_CACHE.put(theClass, meth); - } - result = meth.newInstance(new Object[]{conf, schema, meta, path}); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java deleted file mode 100644 index ed6ea34..0000000 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.catalog.statistics.TableStats; - -import java.io.Closeable; -import java.io.IOException; - -public interface Appender extends Closeable { - - void init() throws IOException; - - void addTuple(Tuple t) throws IOException; - - void flush() throws IOException; - - void close() throws IOException; - - void enableStats(); - - TableStats getStats(); -} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java deleted file mode 100644 index ed034be..0000000 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.base.Preconditions; -import com.google.protobuf.Message; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.datum.*; -import org.apache.tajo.util.Bytes; - -import java.io.IOException; -import java.io.OutputStream; - -public class BinarySerializerDeserializer implements SerializerDeserializer { - - static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)}; - - @Override - public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) - throws IOException { - byte[] bytes; - int length = 0; - if (datum == null || datum instanceof NullDatum) { - return 0; - } - - switch (col.getDataType().getType()) { - case BOOLEAN: - case BIT: - case CHAR: - bytes = datum.asByteArray(); - length = bytes.length; - out.write(bytes, 0, length); - break; - case INT2: - length = writeShort(out, datum.asInt2()); - break; - case INT4: - length = writeVLong(out, datum.asInt4()); - break; - case INT8: - length = writeVLong(out, datum.asInt8()); - break; - case FLOAT4: - length = writeFloat(out, datum.asFloat4()); - break; - case FLOAT8: - length = writeDouble(out, datum.asFloat8()); - break; - case TEXT: { - bytes = datum.asTextBytes(); - length = datum.size(); - if (length == 0) { - bytes = INVALID_UTF__SINGLE_BYTE; - length = INVALID_UTF__SINGLE_BYTE.length; - } - out.write(bytes, 0, bytes.length); - break; - } - case BLOB: - case INET4: - case INET6: - bytes = datum.asByteArray(); - length = bytes.length; - out.write(bytes, 0, length); - break; - case PROTOBUF: - ProtobufDatum protobufDatum = (ProtobufDatum) datum; - bytes = protobufDatum.asByteArray(); - length = bytes.length; - out.write(bytes, 0, length); - break; - case NULL_TYPE: - break; - default: - throw new IOException("Does not support type"); - } - return length; - } - - @Override - public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { - if (length == 0) return NullDatum.get(); - - Datum datum; - switch (col.getDataType().getType()) { - case BOOLEAN: - datum = DatumFactory.createBool(bytes[offset]); - break; - case BIT: - datum = DatumFactory.createBit(bytes[offset]); - break; - case CHAR: { - byte[] chars = new byte[length]; - System.arraycopy(bytes, offset, chars, 0, length); - datum = DatumFactory.createChar(chars); - break; - } - case INT2: - datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length)); - break; - case INT4: - datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset)); - break; - case INT8: - datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset)); - break; - case FLOAT4: - datum = DatumFactory.createFloat4(toFloat(bytes, offset, length)); - break; - case FLOAT8: - datum = DatumFactory.createFloat8(toDouble(bytes, offset, length)); - break; - case TEXT: { - byte[] chars = new byte[length]; - System.arraycopy(bytes, offset, chars, 0, length); - - if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) { - datum = DatumFactory.createText(new byte[0]); - } else { - datum = DatumFactory.createText(chars); - } - break; - } - case PROTOBUF: { - ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode()); - Message.Builder builder = factory.newBuilder(); - builder.mergeFrom(bytes, offset, length); - datum = factory.createDatum(builder); - break; - } - case INET4: - datum = DatumFactory.createInet4(bytes, offset, length); - break; - case BLOB: - datum = DatumFactory.createBlob(bytes, offset, length); - break; - default: - datum = NullDatum.get(); - } - return datum; - } - - private byte[] shortBytes = new byte[2]; - - public int writeShort(OutputStream out, short val) throws IOException { - shortBytes[0] = (byte) (val >> 8); - shortBytes[1] = (byte) val; - out.write(shortBytes, 0, 2); - return 2; - } - - public float toFloat(byte[] bytes, int offset, int length) { - Preconditions.checkArgument(length == 4); - - int val = ((bytes[offset] & 0x000000FF) << 24) + - ((bytes[offset + 1] & 0x000000FF) << 16) + - ((bytes[offset + 2] & 0x000000FF) << 8) + - (bytes[offset + 3] & 0x000000FF); - return Float.intBitsToFloat(val); - } - - private byte[] floatBytes = new byte[4]; - - public int writeFloat(OutputStream out, float f) throws IOException { - int val = Float.floatToIntBits(f); - - floatBytes[0] = (byte) (val >> 24); - floatBytes[1] = (byte) (val >> 16); - floatBytes[2] = (byte) (val >> 8); - floatBytes[3] = (byte) val; - out.write(floatBytes, 0, 4); - return floatBytes.length; - } - - public double toDouble(byte[] bytes, int offset, int length) { - Preconditions.checkArgument(length == 8); - long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) + - ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) + - ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) + - ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) + - ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) + - ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) + - ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) + - (long) (bytes[offset + 7] & 0x00000000000000FF); - return Double.longBitsToDouble(val); - } - - private byte[] doubleBytes = new byte[8]; - - public int writeDouble(OutputStream out, double d) throws IOException { - long val = Double.doubleToLongBits(d); - - doubleBytes[0] = (byte) (val >> 56); - doubleBytes[1] = (byte) (val >> 48); - doubleBytes[2] = (byte) (val >> 40); - doubleBytes[3] = (byte) (val >> 32); - doubleBytes[4] = (byte) (val >> 24); - doubleBytes[5] = (byte) (val >> 16); - doubleBytes[6] = (byte) (val >> 8); - doubleBytes[7] = (byte) val; - out.write(doubleBytes, 0, 8); - return doubleBytes.length; - } - - private byte[] vLongBytes = new byte[9]; - - public int writeVLongToByteArray(byte[] bytes, int offset, long l) { - if (l >= -112 && l <= 127) { - bytes[offset] = (byte) l; - return 1; - } - - int len = -112; - if (l < 0) { - l ^= -1L; // take one's complement' - len = -120; - } - - long tmp = l; - while (tmp != 0) { - tmp = tmp >> 8; - len--; - } - - bytes[offset++] = (byte) len; - len = (len < -120) ? -(len + 120) : -(len + 112); - - for (int idx = len; idx != 0; idx--) { - int shiftbits = (idx - 1) * 8; - bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits); - } - return 1 + len; - } - - public int writeVLong(OutputStream out, long l) throws IOException { - int len = writeVLongToByteArray(vLongBytes, 0, l); - out.write(vLongBytes, 0, len); - return len; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java deleted file mode 100644 index 5d05d6f..0000000 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java +++ /dev/null @@ -1,531 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.lang.StringEscapeUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.*; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.exception.UnsupportedException; -import org.apache.tajo.storage.compress.CodecPool; -import org.apache.tajo.storage.exception.AlreadyExistsStorageException; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; -import org.apache.tajo.util.Bytes; - -import java.io.*; -import java.util.ArrayList; -import java.util.Arrays; - -public class CSVFile { - - public static final String DELIMITER = "csvfile.delimiter"; - public static final String NULL = "csvfile.null"; //read only - public static final String SERDE = "csvfile.serde"; - public static final String DELIMITER_DEFAULT = "|"; - public static final byte LF = '\n'; - public static int EOF = -1; - - private static final Log LOG = LogFactory.getLog(CSVFile.class); - - public static class CSVAppender extends FileAppender { - private final TableMeta meta; - private final Schema schema; - private final int columnNum; - private final FileSystem fs; - private FSDataOutputStream fos; - private DataOutputStream outputStream; - private CompressionOutputStream deflateFilter; - private char delimiter; - private TableStatistics stats = null; - private Compressor compressor; - private CompressionCodecFactory codecFactory; - private CompressionCodec codec; - private Path compressedPath; - private byte[] nullChars; - private int BUFFER_SIZE = 128 * 1024; - private int bufferedBytes = 0; - private long pos = 0; - - private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); - private SerializerDeserializer serde; - - public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException { - super(conf, schema, meta, path); - this.fs = path.getFileSystem(conf); - this.meta = meta; - this.schema = schema; - this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(DELIMITER, DELIMITER_DEFAULT)).charAt(0); - this.columnNum = schema.getColumnNum(); - String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(NULL)); - if (StringUtils.isEmpty(nullCharacters)) { - nullChars = NullDatum.get().asTextBytes(); - } else { - nullChars = nullCharacters.getBytes(); - } - } - - @Override - public void init() throws IOException { - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.toString()); - } - - String codecName = this.meta.getOption(TableMeta.COMPRESSION_CODEC); - if(!StringUtils.isEmpty(codecName)){ - codecFactory = new CompressionCodecFactory(conf); - codec = codecFactory.getCodecByClassName(codecName); - compressor = CodecPool.getCompressor(codec); - if(compressor != null) compressor.reset(); //builtin gzip is null - - String extension = codec.getDefaultExtension(); - compressedPath = path.suffix(extension); - - if (fs.exists(compressedPath)) { - throw new AlreadyExistsStorageException(compressedPath); - } - - fos = fs.create(compressedPath); - deflateFilter = codec.createOutputStream(fos, compressor); - outputStream = new DataOutputStream(deflateFilter); - - } else { - if (fs.exists(path)) { - throw new AlreadyExistsStorageException(path); - } - fos = fs.create(path); - outputStream = new DataOutputStream(new BufferedOutputStream(fos)); - } - - if (enabledStats) { - this.stats = new TableStatistics(this.schema); - } - - try { - String serdeClass = this.meta.getOption(SERDE, TextSerializerDeserializer.class.getName()); - serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } - - os.reset(); - pos = fos.getPos(); - bufferedBytes = 0; - super.init(); - } - - - @Override - public void addTuple(Tuple tuple) throws IOException { - Datum datum; - int rowBytes = 0; - - for (int i = 0; i < columnNum; i++) { - datum = tuple.get(i); - rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars); - - if(columnNum - 1 > i){ - os.write((byte) delimiter); - rowBytes += 1; - } - if (enabledStats) { - stats.analyzeField(i, datum); - } - } - os.write(LF); - rowBytes += 1; - - pos += rowBytes; - bufferedBytes += rowBytes; - if(bufferedBytes > BUFFER_SIZE){ - flushBuffer(); - } - // Statistical section - if (enabledStats) { - stats.incrementRow(); - } - } - - private void flushBuffer() throws IOException { - if(os.getLength() > 0) { - os.writeTo(outputStream); - os.reset(); - bufferedBytes = 0; - } - } - @Override - public long getOffset() throws IOException { - return pos; - } - - @Override - public void flush() throws IOException { - flushBuffer(); - outputStream.flush(); - } - - @Override - public void close() throws IOException { - - try { - flush(); - - // Statistical section - if (enabledStats) { - stats.setNumBytes(getOffset()); - } - - if(deflateFilter != null) { - deflateFilter.finish(); - deflateFilter.resetState(); - deflateFilter = null; - } - - os.close(); - } finally { - IOUtils.cleanup(LOG, fos); - if (compressor != null) { - CodecPool.returnCompressor(compressor); - compressor = null; - } - } - } - - @Override - public TableStats getStats() { - if (enabledStats) { - return stats.getTableStat(); - } else { - return null; - } - } - - public boolean isCompress() { - return compressor != null; - } - - public String getExtension() { - return codec != null ? codec.getDefaultExtension() : ""; - } - } - - public static class CSVScanner extends FileScanner implements SeekableScanner { - public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) - throws IOException { - super(conf, schema, meta, fragment); - factory = new CompressionCodecFactory(conf); - codec = factory.getCodec(fragment.getPath()); - if (codec == null || codec instanceof SplittableCompressionCodec) { - splittable = true; - } - - //Delimiter - String delim = meta.getOption(DELIMITER, DELIMITER_DEFAULT); - this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0); - - String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(NULL)); - if (StringUtils.isEmpty(nullCharacters)) { - nullChars = NullDatum.get().asTextBytes(); - } else { - nullChars = nullCharacters.getBytes(); - } - } - - private final static int DEFAULT_PAGE_SIZE = 256 * 1024; - private char delimiter; - private FileSystem fs; - private FSDataInputStream fis; - private InputStream is; //decompressd stream - private CompressionCodecFactory factory; - private CompressionCodec codec; - private Decompressor decompressor; - private Seekable filePosition; - private boolean splittable = false; - private long startOffset, end, pos; - private int currentIdx = 0, validIdx = 0, recordCount = 0; - private int[] targetColumnIndexes; - private boolean eof = false; - private final byte[] nullChars; - private SplitLineReader reader; - private ArrayList fileOffsets = new ArrayList(); - private ArrayList rowLengthList = new ArrayList(); - private ArrayList startOffsets = new ArrayList(); - private NonSyncByteArrayOutputStream buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE); - private SerializerDeserializer serde; - - @Override - public void init() throws IOException { - - // FileFragment information - if(fs == null) { - fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath()); - } - if(fis == null) fis = fs.open(fragment.getPath()); - - recordCount = 0; - pos = startOffset = fragment.getStartKey(); - end = startOffset + fragment.getEndKey(); - - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - if (codec instanceof SplittableCompressionCodec) { - SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream( - fis, decompressor, startOffset, end, - SplittableCompressionCodec.READ_MODE.BYBLOCK); - - reader = new CompressedSplitLineReader(cIn, conf, null); - startOffset = cIn.getAdjustedStart(); - end = cIn.getAdjustedEnd(); - filePosition = cIn; - is = cIn; - } else { - is = new DataInputStream(codec.createInputStream(fis, decompressor)); - reader = new SplitLineReader(is, null); - filePosition = fis; - } - } else { - fis.seek(startOffset); - filePosition = fis; - is = fis; - reader = new SplitLineReader(is, null); - } - - if (targets == null) { - targets = schema.toArray(); - } - - targetColumnIndexes = new int[targets.length]; - for (int i = 0; i < targets.length; i++) { - targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName()); - } - - try { - String serdeClass = this.meta.getOption(SERDE, TextSerializerDeserializer.class.getName()); - serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } - - super.init(); - Arrays.sort(targetColumnIndexes); - if (LOG.isDebugEnabled()) { - LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end + - "," + fs.getFileStatus(fragment.getPath()).getLen()); - } - - if (startOffset != 0) { - startOffset += reader.readLine(new Text(), 0, maxBytesToConsume(startOffset)); - pos = startOffset; - } - eof = false; - page(); - } - - private int maxBytesToConsume(long pos) { - return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos); - } - - private long fragmentable() throws IOException { - return end - getFilePosition(); - } - - private long getFilePosition() throws IOException { - long retVal; - if (isCompress()) { - retVal = filePosition.getPos(); - } else { - retVal = pos; - } - return retVal; - } - - private void page() throws IOException { -// // Index initialization - currentIdx = 0; - validIdx = 0; - int currentBufferPos = 0; - int bufferedSize = 0; - - buffer.reset(); - startOffsets.clear(); - rowLengthList.clear(); - fileOffsets.clear(); - - if(eof) return; - - while (DEFAULT_PAGE_SIZE >= bufferedSize){ - - int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE); - - if(ret == 0){ - break; - } else { - fileOffsets.add(pos); - pos += ret; - startOffsets.add(currentBufferPos); - currentBufferPos += rowLengthList.get(rowLengthList.size() - 1); - bufferedSize += ret; - validIdx++; - recordCount++; - } - - if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){ - eof = true; - break; - } - } - } - - @Override - public Tuple next() throws IOException { - try { - if (currentIdx == validIdx) { - if (eof) { - return null; - } else { - page(); - - if(currentIdx == validIdx){ - return null; - } - } - } - - long offset = -1; - if(!isCompress()){ - offset = fileOffsets.get(currentIdx); - } - - byte[][] cells = Bytes.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx), - rowLengthList.get(currentIdx), delimiter, targetColumnIndexes); - currentIdx++; - return new LazyTuple(schema, cells, offset, nullChars, serde); - } catch (Throwable t) { - LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t); - LOG.error("Tuple list current index: " + currentIdx, t); - throw new IOException(t); - } - } - - private boolean isCompress() { - return codec != null; - } - - @Override - public void reset() throws IOException { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - decompressor = null; - } - - init(); - } - - @Override - public void close() throws IOException { - try { - IOUtils.cleanup(LOG, reader, is, fis); - fs = null; - is = null; - fis = null; - if (LOG.isDebugEnabled()) { - LOG.debug("CSVScanner processed record:" + recordCount); - } - } finally { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - decompressor = null; - } - } - } - - @Override - public boolean isProjectable() { - return true; - } - - @Override - public boolean isSelectable() { - return false; - } - - @Override - public void setSearchCondition(Object expr) { - } - - @Override - public void seek(long offset) throws IOException { - if(isCompress()) throw new UnsupportedException(); - - int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset); - - if (tupleIndex > -1) { - this.currentIdx = tupleIndex; - } else if (isSplittable() && end >= offset || startOffset <= offset) { - eof = false; - fis.seek(offset); - pos = offset; - reader.reset(); - this.currentIdx = 0; - this.validIdx = 0; - // pageBuffer(); - } else { - throw new IOException("invalid offset " + - " < start : " + startOffset + " , " + - " end : " + end + " , " + - " filePos : " + filePosition.getPos() + " , " + - " input offset : " + offset + " >"); - } - } - - @Override - public long getNextOffset() throws IOException { - if(isCompress()) throw new UnsupportedException(); - - if (this.currentIdx == this.validIdx) { - if (fragmentable() <= 0) { - return -1; - } else { - page(); - if(currentIdx == validIdx) return -1; - } - } - return fileOffsets.get(currentIdx); - } - - @Override - public boolean isSplittable(){ - return splittable; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java deleted file mode 100644 index 4f58e68..0000000 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.SplitCompressionInputStream; -import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; - -/** - * Line reader for compressed splits - * - * Reading records from a compressed split is tricky, as the - * LineRecordReader is using the reported compressed input stream - * position directly to determine when a split has ended. In addition the - * compressed input stream is usually faking the actual byte position, often - * updating it only after the first compressed block after the split is - * accessed. - * - * Depending upon where the last compressed block of the split ends relative - * to the record delimiters it can be easy to accidentally drop the last - * record or duplicate the last record between this split and the next. - * - * Split end scenarios: - * - * 1) Last block of split ends in the middle of a record - * Nothing special that needs to be done here, since the compressed input - * stream will report a position after the split end once the record - * is fully read. The consumer of the next split will discard the - * partial record at the start of the split normally, and no data is lost - * or duplicated between the splits. - * - * 2) Last block of split ends in the middle of a delimiter - * The line reader will continue to consume bytes into the next block to - * locate the end of the delimiter. If a custom delimiter is being used - * then the next record must be read by this split or it will be dropped. - * The consumer of the next split will not recognize the partial - * delimiter at the beginning of its split and will discard it along with - * the next record. - * - * However for the default delimiter processing there is a special case - * because CR, LF, and CRLF are all valid record delimiters. If the - * block ends with a CR then the reader must peek at the next byte to see - * if it is an LF and therefore part of the same record delimiter. - * Peeking at the next byte is an access to the next block and triggers - * the stream to report the end of the split. There are two cases based - * on the next byte: - * - * A) The next byte is LF - * The split needs to end after the current record is returned. The - * consumer of the next split will discard the first record, which - * is degenerate since LF is itself a delimiter, and start consuming - * records after that byte. If the current split tries to read - * another record then the record will be duplicated between splits. - * - * B) The next byte is not LF - * The current record will be returned but the stream will report - * the split has ended due to the peek into the next block. If the - * next record is not read then it will be lost, as the consumer of - * the next split will discard it before processing subsequent - * records. Therefore the next record beyond the reported split end - * must be consumed by this split to avoid data loss. - * - * 3) Last block of split ends at the beginning of a delimiter - * This is equivalent to case 1, as the reader will consume bytes into - * the next block and trigger the end of the split. No further records - * should be read as the consumer of the next split will discard the - * (degenerate) record at the beginning of its split. - * - * 4) Last block of split ends at the end of a delimiter - * Nothing special needs to be done here. The reader will not start - * examining the bytes into the next block until the next record is read, - * so the stream will not report the end of the split just yet. Once the - * next record is read then the next block will be accessed and the - * stream will indicate the end of the split. The consumer of the next - * split will correctly discard the first record of its split, and no - * data is lost or duplicated. - * - * If the default delimiter is used and the block ends at a CR then this - * is treated as case 2 since the reader does not yet know without - * looking at subsequent bytes whether the delimiter has ended. - * - * NOTE: It is assumed that compressed input streams *never* return bytes from - * multiple compressed blocks from a single read. Failure to do so will - * violate the buffering performed by this class, as it will access - * bytes into the next block after the split before returning all of the - * records from the previous block. - */ - -public class CompressedSplitLineReader extends SplitLineReader { - SplitCompressionInputStream scin; - private boolean usingCRLF; - private boolean needAdditionalRecord = false; - private boolean finished = false; - - public CompressedSplitLineReader(SplitCompressionInputStream in, - Configuration conf, - byte[] recordDelimiterBytes) - throws IOException { - super(in, conf, recordDelimiterBytes); - scin = in; - usingCRLF = (recordDelimiterBytes == null); - } - - @Override - protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) - throws IOException { - int bytesRead = in.read(buffer); - - // If the split ended in the middle of a record delimiter then we need - // to read one additional record, as the consumer of the next split will - // not recognize the partial delimiter as a record. - // However if using the default delimiter and the next character is a - // linefeed then next split will treat it as a delimiter all by itself - // and the additional record read should not be performed. - if (inDelimiter && bytesRead > 0) { - if (usingCRLF) { - needAdditionalRecord = (buffer[0] != '\n'); - } else { - needAdditionalRecord = true; - } - } - return bytesRead; - } - - @Override - public int readLine(Text str, int maxLineLength, int maxBytesToConsume) - throws IOException { - int bytesRead = 0; - if (!finished) { - // only allow at most one more record to be read after the stream - // reports the split ended - if (scin.getPos() > scin.getAdjustedEnd()) { - finished = true; - } - - bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume); - } - return bytesRead; - } - - @Override - public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList offsets, int maxLineLength - , int maxBytesToConsume) throws IOException { - int bytesRead = 0; - if (!finished) { - // only allow at most one more record to be read after the stream - // reports the split ended - if (scin.getPos() > scin.getAdjustedEnd()) { - finished = true; - } - - bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume); - } - return bytesRead; - } - - @Override - public boolean needAdditionalRecordAfterSplit() { - return !finished && needAdditionalRecord; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/DataLocation.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/DataLocation.java deleted file mode 100644 index 8841a31..0000000 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/DataLocation.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -public class DataLocation { - private String host; - private int volumeId; - - public DataLocation(String host, int volumeId) { - this.host = host; - this.volumeId = volumeId; - } - - public String getHost() { - return host; - } - - public int getVolumeId() { - return volumeId; - } - - @Override - public String toString() { - return "DataLocation{" + - "host=" + host + - ", volumeId=" + volumeId + - '}'; - } -}