Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2014B200CBB for ; Tue, 4 Jul 2017 16:05:02 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1EB391613DF; Tue, 4 Jul 2017 14:05:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E864A1613DB for ; Tue, 4 Jul 2017 16:05:00 +0200 (CEST) Received: (qmail 81024 invoked by uid 500); 4 Jul 2017 14:05:00 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 80974 invoked by uid 99); 4 Jul 2017 14:05:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Jul 2017 14:05:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C99D6EEE19; Tue, 4 Jul 2017 14:04:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.apache.org Date: Tue, 04 Jul 2017 14:05:05 -0000 Message-Id: <6b1665787415464892666a147b66608d@git.apache.org> In-Reply-To: <49836a2d06684bed9adc22107ef5c245@git.apache.org> References: <49836a2d06684bed9adc22107ef5c245@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [8/8] carbondata git commit: [CARBONDATA-1244] Polish docs and comments in presto integration archived-at: Tue, 04 Jul 2017 14:05:02 -0000 [CARBONDATA-1244] Polish docs and comments in presto integration This closes #1131 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b699ee6f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b699ee6f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b699ee6f Branch: refs/heads/encoding_override Commit: b699ee6f72d280ff0969663f598769e66d8abdb9 Parents: bbb95ce Author: bianhq Authored: Tue Jul 4 01:36:42 2017 +0800 Committer: chenliang613 Committed: Tue Jul 4 11:52:19 2017 +0800 ---------------------------------------------------------------------- integration/presto/README.md | 51 ++++-- .../presto/CarbondataConnectorFactory.java | 2 +- .../presto/impl/CarbonLocalInputSplit.java | 13 +- .../presto/impl/CarbonTableCacheModel.java | 2 +- .../presto/impl/CarbonTableReader.java | 154 +++++++++++++++++-- 5 files changed, 185 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/README.md ---------------------------------------------------------------------- diff --git a/integration/presto/README.md b/integration/presto/README.md index 9935478..dc14cb0 100644 --- a/integration/presto/README.md +++ b/integration/presto/README.md @@ -20,14 +20,10 @@ Please follow the below steps to query carbondata in presto ### Config presto server -* Download presto server 0.166 : https://repo1.maven.org/maven2/com/facebook/presto/presto-server/ -* Finish configuration as per https://prestodb.io/docs/current/installation/deployment.html - for example: +* Download presto server (0.166 is suggested and supported) : https://repo1.maven.org/maven2/com/facebook/presto/presto-server/ +* Finish presto configuration following https://prestodb.io/docs/current/installation/deployment.html. + A configuration example: ``` - carbondata.properties: - connector.name=carbondata - carbondata-store=/Users/apple/DEMO/presto_test/data - config.properties: coordinator=true node-scheduler.include-coordinator=true @@ -57,30 +53,51 @@ Please follow the below steps to query carbondata in presto node.id=ffffffff-ffff-ffff-ffff-ffffffffffff node.data-dir=/Users/apple/DEMO/presto_test/data ``` -* config carbondata-connector for presto +* Config carbondata-connector for presto - First:compile carbondata-presto integration module + Firstly: Compile carbondata, including carbondata-presto integration module ``` $ git clone https://github.com/apache/carbondata - $ cd carbondata/integration/presto - $ mvn clean package + $ cd carbondata + $ mvn -DskipTests -P{spark-version} -Dspark.version={spark-version-number} -Dhadoop.version={hadoop-version-number} clean package + ``` + Replace the spark and hadoop version with the version used in your cluster. + For example, if you are using Spark 2.1.0 and Hadoop 2.7.2, you would like to compile using: + ``` + mvn -DskipTests -Pspark-2.1 -Dspark.version=2.1.0 -Dhadoop.version=2.7.2 clean package + ``` + + Secondly: Create a folder named 'carbondata' under $PRESTO_HOME$/plugin and + copy all jars from carbondata/integration/presto/target/carbondata-presto-x.x.x-SNAPSHOT + to $PRESTO_HOME$/plugin/carbondata + + Thirdly: Create a carbondata.properties file under $PRESTO_HOME$/etc/catalog/ containing the following contents: ``` - Second:create one folder "carbondata" under ./presto-server-0.166/plugin - Third:copy all jar from ./carbondata/integration/presto/target/carbondata-presto-x.x.x-SNAPSHOT - to ./presto-server-0.166/plugin/carbondata + connector.name=carbondata + carbondata-store={schema-store-path} + ``` + Replace the schema-store-path with the absolute path of the parent directory of the schema. + For example, if you have a schema named 'default' stored in hdfs://namenode:9000/test/carbondata/, + Then set carbondata-store=hdfs://namenode:9000/test/carbondata + + If you updated the jar balls or configuration files, make sure you have dispatched them + to all the presto nodes and restarted the presto servers on the nodes. The updates will not take effect before restarting. ### Generate CarbonData file -Please refer to quick start : https://github.com/apache/carbondata/blob/master/docs/quick-start-guide.md +Please refer to quick start: https://github.com/apache/carbondata/blob/master/docs/quick-start-guide.md. +Load data statement in Spark can be used to create carbondata tables. And then you can easily find the created +carbondata files. ### Query carbondata in CLI of presto -* Download presto-cli-0.166-executable.jar +* Download presto cli client following: https://prestodb.io/docs/current/installation/cli.html * Start CLI: ``` - $ ./presto-cli-0.166-executable.jar --server localhost:8086 --catalog carbondata --schema default + $ ./presto --server localhost:8086 --catalog carbondata --schema default ``` + Replace the hostname, port and schema name with your own. http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java index d97f19e..d557920 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java @@ -71,7 +71,7 @@ public class CarbondataConnectorFactory implements ConnectorFactory { ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class); ConnectorRecordSetProvider connectorRecordSet = injector.getInstance(ConnectorRecordSetProvider.class); - ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class); + ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class); return new CarbondataConnector(lifeCycleManager, metadata, new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet, http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java index ba8d9b5..f0a8428 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java @@ -17,19 +17,22 @@ package org.apache.carbondata.presto.impl; -import java.util.List; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +/** + * CarbonLocalInputSplit represents a block, it contains a set of blocklet. + */ public class CarbonLocalInputSplit { private static final long serialVersionUID = 3520344046772190207L; private String segmentId; private String path; - private long start; - private long length; - private List locations; + private long start; // the start offset of the block in a carbondata file. + private long length; // the length of the block. + private List locations;// locations are the locations for different replicas. private short version; /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java index 45755d1..2a4db14 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java @@ -23,7 +23,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.util.path.CarbonTablePath; /** - * Caching Carbon meta(e.g. TableIdentifier, TablePath, TableInfo, CarbonTable) in Class CarbonTableReader + * Caching metadata of CarbonData(e.g. TableIdentifier, TablePath, TableInfo, CarbonTable) in Class CarbonTableReader * to speed up query */ public class CarbonTableCacheModel { http://git-wip-us.apache.org/repos/asf/carbondata/blob/b699ee6f/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java index c328a64..54832f5 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -72,15 +72,31 @@ import static java.util.Objects.requireNonNull; * 2:FileFactory, (physic table file) * 3:CarbonCommonFactory, (offer some ) * 4:DictionaryFactory, (parse dictionary util) + * + * Currently, it is mainly used to parse metadata of tables under + * the configured carbondata-store path and filter the relevant + * input splits with given query predicates. */ public class CarbonTableReader { private CarbonTableConfig config; + + /** + * The names of the tables under the schema (this.carbonFileList). + */ private List tableList; + + /** + * carbonFileList represents the store path of the schema, which is configured as carbondata-store + * in the CarbonData catalog file ($PRESTO_HOME$/etc/catalog/carbondata.properties). + */ private CarbonFile carbonFileList; private FileFactory.FileType fileType; - // A cache for Carbon reader + /** + * A cache for Carbon reader, with this cache, + * metadata of a table is only read from file system once. + */ private ConcurrentHashMap cc; @Inject public CarbonTableReader(CarbonTableConfig config) { @@ -88,9 +104,14 @@ public class CarbonTableReader { this.cc = new ConcurrentHashMap<>(); } - // for worker node to initialize carbon metastore + /** + * For presto worker node to initialize the metadata cache of a table. + * @param table the name of the table and schema. + * @return + */ public CarbonTableCacheModel getCarbonCache(SchemaTableName table) { if (!cc.containsKey(table)) { + // if this table is not cached, try to read the metadata of the table and cache it. try (ThreadContextClassLoader ignored = new ThreadContextClassLoader( FileFactory.class.getClassLoader())) { if (carbonFileList == null) { @@ -110,17 +131,26 @@ public class CarbonTableReader { else return null; } + /** + * Return the schema names under a schema store path (this.carbonFileList). + * @return + */ public List getSchemaNames() { return updateSchemaList(); } - // default PathFilter + // default PathFilter, accepts files in carbondata format (with .carbondata extension). private static final PathFilter DefaultFilter = new PathFilter() { @Override public boolean accept(Path path) { return CarbonTablePath.isCarbonDataFile(path.getName()); } }; + /** + * Get the CarbonFile instance which represents the store path in the configuration, and assign it to + * this.carbonFileList. + * @return + */ public boolean updateCarbonFile() { if (carbonFileList == null) { fileType = FileFactory.getFileType(config.getStorePath()); @@ -133,6 +163,10 @@ public class CarbonTableReader { return true; } + /** + * Return the schema names under a schema store path (this.carbonFileList). + * @return + */ public List updateSchemaList() { updateCarbonFile(); @@ -143,13 +177,23 @@ public class CarbonTableReader { } else return ImmutableList.of(); } + /** + * Get the names of the tables in the given schema. + * @param schema name of the schema + * @return + */ public Set getTableNames(String schema) { requireNonNull(schema, "schema is null"); return updateTableList(schema); } - public Set updateTableList(String dbName) { - List schema = Stream.of(carbonFileList.listFiles()).filter(a -> dbName.equals(a.getName())) + /** + * Get the names of the tables in the given schema. + * @param schemaName name of the schema + * @return + */ + public Set updateTableList(String schemaName) { + List schema = Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName())) .collect(Collectors.toList()); if (schema.size() > 0) { return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName()) @@ -157,6 +201,11 @@ public class CarbonTableReader { } else return ImmutableSet.of(); } + /** + * Get the CarbonTable instance of the given table. + * @param schemaTableName name of the given table. + * @return + */ public CarbonTable getTable(SchemaTableName schemaTableName) { try { updateSchemaTables(); @@ -170,6 +219,11 @@ public class CarbonTableReader { return table; } + /** + * Find all the tables under the schema store path (this.carbonFileList) + * and cache all the table names in this.tableList. Notice that whenever this method + * is called, it clears this.tableList and populate the list by reading the files. + */ public void updateSchemaTables() { // update logic determine later if (carbonFileList == null) { @@ -185,6 +239,12 @@ public class CarbonTableReader { } } + /** + * Find the table with the given name and build a CarbonTable instance for it. + * This method should be called after this.updateSchemaTables(). + * @param schemaTableName name of the given table. + * @return + */ private CarbonTable loadTableMetadata(SchemaTableName schemaTableName) { for (SchemaTableName table : tableList) { if (!table.equals(schemaTableName)) continue; @@ -195,7 +255,9 @@ public class CarbonTableReader { } /** - * parse carbon metadata into cc(CarbonTableReader cache) + * Read the metadata of the given table and cache it in this.cc (CarbonTableReader cache). + * @param table name of the given table. + * @return the CarbonTable instance which contains all the needed metadata for a table. */ public CarbonTable parseCarbonMetadata(SchemaTableName table) { CarbonTable result = null; @@ -203,17 +265,25 @@ public class CarbonTableReader { CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel()); if (cache.isValid()) return cache.carbonTable; - //Step1: get table meta path, load carbon table param + // If table is not previously cached, then: + + // Step 1: get store path of the table and cache it. String storePath = config.getStorePath(); + // create table identifier. the table id is randomly generated. cache.carbonTableIdentifier = new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(), UUID.randomUUID().toString()); + // get the store path of the table. cache.carbonTablePath = PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier); + // cache the table cc.put(table, cache); - //Step2: check file existed? read schema file + //Step 2: read the metadata (tableInfo) of the table. ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() { + // TBase is used to read and write thrift objects. + // TableInfo is a kind of TBase used to read and write table information. + // TableInfo is generated by thrift, see schema.thrift under format/src/main/thrift for details. public TBase create() { return new org.apache.carbondata.format.TableInfo(); } @@ -225,14 +295,16 @@ public class CarbonTableReader { (org.apache.carbondata.format.TableInfo) thriftReader.read(); thriftReader.close(); - // Step3: Transform Format Level TableInfo to Code Level TableInfo + // Step 3: convert format level TableInfo to code level TableInfo SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + // wrapperTableInfo is the code level information of a table in carbondata core, different from the Thrift TableInfo. TableInfo wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(), storePath); wrapperTableInfo.setMetaDataFilepath( CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath())); - // Step4: Load metadata info into CarbonMetadata + + // Step 4: Load metadata info into CarbonMetadata CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo); cache.tableInfo = wrapperTableInfo; @@ -246,6 +318,13 @@ public class CarbonTableReader { return result; } + /** + * Apply filters to the table and get valid input splits of the table. + * @param tableCacheModel the table + * @param filters the filters + * @return + * @throws Exception + */ public List getInputSplits2(CarbonTableCacheModel tableCacheModel, Expression filters) throws Exception { @@ -332,7 +411,16 @@ public class CarbonTableReader { } /** - * get data blocks of given segment + * Get all the data blocks of a given segment. + * @param filterExpressionProcessor + * @param absoluteTableIdentifier + * @param tablePath + * @param resolver + * @param segmentId + * @param cacheClient + * @param updateStatusManager + * @return + * @throws IOException */ private List getDataBlocksOfSegment( FilterExpressionProcessor filterExpressionProcessor, @@ -380,6 +468,16 @@ public class CarbonTableReader { return false; } + /** + * Build and load the B-trees of the segment. + * @param absoluteTableIdentifier + * @param tablePath + * @param segmentId + * @param cacheClient + * @param updateStatusManager + * @return + * @throws IOException + */ private Map getSegmentAbstractIndexs(/*JobContext job,*/ AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath, String segmentId, CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException { @@ -491,6 +589,13 @@ public class CarbonTableReader { return false; } + /** + * Get the input splits of a set of carbondata files. + * @param fileStatusList the file statuses of the set of carbondata files. + * @param targetSystem hdfs FileSystem + * @return + * @throws IOException + */ private List getSplit(List fileStatusList, FileSystem targetSystem) throws IOException { @@ -501,6 +606,7 @@ public class CarbonTableReader { while (true) { while (true) { while (split.hasNext()) { + // file is a carbondata file FileStatus file = (FileStatus) split.next(); Path path = file.getPath(); long length = file.getLen(); @@ -520,7 +626,7 @@ public class CarbonTableReader { int blkIndex; for ( bytesRemaining = length; - (double) bytesRemaining / (double) splitSize > 1.1D; + (double) bytesRemaining / (double) splitSize > 1.1D;// when there are more than one splits left. bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, @@ -552,6 +658,15 @@ public class CarbonTableReader { return new String[] { "0" }; } + /** + * Get all file statuses of the carbondata files with a segmentId in segmentsToConsider + * under the tablePath, and add them to the result. + * @param segmentsToConsider + * @param tablePath + * @param result + * @return the FileSystem instance been used in this function. + * @throws IOException + */ private FileSystem getFileStatusOfSegments(String[] segmentsToConsider, CarbonTablePath tablePath, List result) throws IOException { String[] partitionsToConsider = getValidPartitions(); @@ -584,6 +699,7 @@ public class CarbonTableReader { LocatedFileStatus stat = iter.next(); if (DefaultFilter.accept(stat.getPath())) { if (stat.isDirectory()) { + // DefaultFiler accepts carbondata files. addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter); } else { result.add(stat); @@ -598,6 +714,15 @@ public class CarbonTableReader { return fs; } + /** + * Get the FileStatus of all carbondata files under the path recursively, + * and add the file statuses into the result + * @param result + * @param fs + * @param path + * @param inputFilter the filter used to determinate whether a path is a carbondata file + * @throws IOException + */ protected void addInputPathRecursively(List result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException { RemoteIterator iter = fs.listLocatedStatus(path); @@ -616,7 +741,10 @@ public class CarbonTableReader { } /** - * get data blocks of given btree + * Get the data blocks of a b tree. the root node of the b tree is abstractIndex.dataRefNode. + * BTreeNode is a sub class of DataRefNode. + * @param abstractIndex + * @return */ private List getDataBlocksOfIndex(AbstractIndex abstractIndex) { List blocks = new LinkedList();