Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3E32B200CB5 for ; Wed, 12 Jul 2017 17:04:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3CF2616915F; Wed, 12 Jul 2017 15:04:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 58709169152 for ; Wed, 12 Jul 2017 17:04:41 +0200 (CEST) Received: (qmail 56543 invoked by uid 500); 12 Jul 2017 15:04:40 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 55827 invoked by uid 99); 12 Jul 2017 15:04:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Jul 2017 15:04:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C8F93F5553; Wed, 12 Jul 2017 15:04:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.apache.org Date: Wed, 12 Jul 2017 15:05:21 -0000 Message-Id: <8bc624b64c07464da53c2b8ace84cdc2@git.apache.org> In-Reply-To: <5ad76c37a59f4580b1f47f8c5a852621@git.apache.org> References: <5ad76c37a59f4580b1f47f8c5a852621@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [45/50] [abbrv] carbondata git commit: [CARBONDATA-1271] Enhanced Performance for Hive Integration with Carbondata archived-at: Wed, 12 Jul 2017 15:04:45 -0000 [CARBONDATA-1271] Enhanced Performance for Hive Integration with Carbondata This closes #1142 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cbe14197 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cbe14197 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cbe14197 Branch: refs/heads/datamap Commit: cbe141976a53a558b84d6e31baf3ec54a9bc38cc Parents: 285ce72 Author: Bhavya Authored: Thu Jul 6 11:53:03 2017 +0530 Committer: chenliang613 Committed: Wed Jul 12 17:40:11 2017 +0800 ---------------------------------------------------------------------- .../core/stats/QueryStatisticsRecorderImpl.java | 80 +++--- .../carbondata/hadoop/CarbonInputFormat.java | 7 +- .../carbondata/hive/CarbonArrayInspector.java | 4 - .../hive/CarbonDictionaryDecodeReadSupport.java | 288 +++++++++++++++++++ .../carbondata/hive/CarbonHiveInputSplit.java | 23 +- .../carbondata/hive/CarbonHiveRecordReader.java | 67 ++--- .../apache/carbondata/hive/CarbonHiveSerDe.java | 36 +-- .../hive/MapredCarbonInputFormat.java | 129 ++++++--- .../hive/server/HiveEmbeddedServer2.java | 1 + 9 files changed, 477 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java index f84a674..ffb7d7f 100644 --- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java @@ -101,45 +101,47 @@ public class QueryStatisticsRecorderImpl implements QueryStatisticsRecorder, Ser long scannedPages = 0; try { for (QueryStatistic statistic : queryStatistics) { - switch (statistic.getMessage()) { - case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR: - load_blocks_time += statistic.getTimeTaken(); - break; - case QueryStatisticsConstants.SCAN_BLOCKlET_TIME: - scan_blocks_time += statistic.getCount(); - break; - case QueryStatisticsConstants.SCAN_BLOCKS_NUM: - scan_blocks_num += statistic.getCount(); - break; - case QueryStatisticsConstants.LOAD_DICTIONARY: - load_dictionary_time += statistic.getTimeTaken(); - break; - case QueryStatisticsConstants.RESULT_SIZE: - result_size += statistic.getCount(); - break; - case QueryStatisticsConstants.EXECUTOR_PART: - total_executor_time += statistic.getTimeTaken(); - break; - case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM: - total_blocklet = statistic.getCount(); - break; - case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM: - valid_scan_blocklet = statistic.getCount(); - break; - case QueryStatisticsConstants.VALID_PAGE_SCANNED: - valid_pages_blocklet = statistic.getCount(); - break; - case QueryStatisticsConstants.TOTAL_PAGE_SCANNED: - total_pages = statistic.getCount(); - break; - case QueryStatisticsConstants.READ_BLOCKlET_TIME: - readTime = statistic.getCount(); - break; - case QueryStatisticsConstants.PAGE_SCANNED: - scannedPages = statistic.getCount(); - break; - default: - break; + if (statistic.getMessage() != null) { + switch (statistic.getMessage()) { + case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR: + load_blocks_time += statistic.getTimeTaken(); + break; + case QueryStatisticsConstants.SCAN_BLOCKlET_TIME: + scan_blocks_time += statistic.getCount(); + break; + case QueryStatisticsConstants.SCAN_BLOCKS_NUM: + scan_blocks_num += statistic.getCount(); + break; + case QueryStatisticsConstants.LOAD_DICTIONARY: + load_dictionary_time += statistic.getTimeTaken(); + break; + case QueryStatisticsConstants.RESULT_SIZE: + result_size += statistic.getCount(); + break; + case QueryStatisticsConstants.EXECUTOR_PART: + total_executor_time += statistic.getTimeTaken(); + break; + case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM: + total_blocklet = statistic.getCount(); + break; + case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM: + valid_scan_blocklet = statistic.getCount(); + break; + case QueryStatisticsConstants.VALID_PAGE_SCANNED: + valid_pages_blocklet = statistic.getCount(); + break; + case QueryStatisticsConstants.TOTAL_PAGE_SCANNED: + total_pages = statistic.getCount(); + break; + case QueryStatisticsConstants.READ_BLOCKlET_TIME: + readTime = statistic.getCount(); + break; + case QueryStatisticsConstants.PAGE_SCANNED: + scannedPages = statistic.getCount(); + break; + default: + break; + } } } String headers = http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index 1e69648..16b5d69 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -444,9 +444,14 @@ public class CarbonInputFormat extends FileInputFormat { } } } + + // For Hive integration if we have to get the stats we have to fetch hive.query.id + String query_id = job.getConfiguration().get("query.id") != null ? + job.getConfiguration().get("query.id") : + job.getConfiguration().get("hive.query.id"); statistic .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis()); - recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id")); + recorder.recordStatisticsForDriver(statistic, query_id); return resultFilterredBlocks; } finally { // clean up the access count for a segment as soon as its usage is complete so that in http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java index 49e068a..b26c959 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java @@ -18,7 +18,6 @@ package org.apache.carbondata.hive; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -122,9 +121,6 @@ class CarbonArrayInspector implements SettableListObjectInspector { final Writable[] array = ((ArrayWritable) subObj).get(); final List list = Arrays.asList(array); - - Collections.addAll(list, array); - return list; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java new file mode 100644 index 0000000..bc66d49 --- /dev/null +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hive; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.util.GenericArrayData; + +/** + * This is the class to decode dictionary encoded column data back to its original value. + */ +public class CarbonDictionaryDecodeReadSupport implements CarbonReadSupport { + + protected Dictionary[] dictionaries; + + protected DataType[] dataTypes; + /** + * carbon columns + */ + protected CarbonColumn[] carbonColumns; + + protected Writable[] writableArr; + + /** + * This initialization is done inside executor task + * for column dictionary involved in decoding. + * + * @param carbonColumns column list + * @param absoluteTableIdentifier table identifier + */ + @Override public void initialize(CarbonColumn[] carbonColumns, + AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException { + this.carbonColumns = carbonColumns; + dictionaries = new Dictionary[carbonColumns.length]; + dataTypes = new DataType[carbonColumns.length]; + for (int i = 0; i < carbonColumns.length; i++) { + if (carbonColumns[i].hasEncoding(Encoding.DICTIONARY) && !carbonColumns[i] + .hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumns[i].isComplex()) { + CacheProvider cacheProvider = CacheProvider.getInstance(); + Cache forwardDictionaryCache = cacheProvider + .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath()); + dataTypes[i] = carbonColumns[i].getDataType(); + dictionaries[i] = forwardDictionaryCache.get( + new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(), + carbonColumns[i].getColumnIdentifier(), dataTypes[i])); + } else { + dataTypes[i] = carbonColumns[i].getDataType(); + } + } + } + + @Override public T readRow(Object[] data) { + assert (data.length == dictionaries.length); + writableArr = new Writable[data.length]; + for (int i = 0; i < dictionaries.length; i++) { + if (dictionaries[i] != null) { + data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]); + } + try { + writableArr[i] = createWritableObject(data[i], carbonColumns[i]); + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + return (T) writableArr; + } + + /** + * to book keep the dictionary cache or update access count for each + * column involved during decode, to facilitate LRU cache policy if memory + * threshold is reached + */ + @Override public void close() { + if (dictionaries == null) { + return; + } + for (int i = 0; i < dictionaries.length; i++) { + CarbonUtil.clearDictionaryCache(dictionaries[i]); + } + } + + /** + * To Create the Writable from the CarbonData data + * + * @param obj + * @param carbonColumn + * @return + * @throws IOException + */ + private Writable createWritableObject(Object obj, CarbonColumn carbonColumn) throws IOException { + DataType dataType = carbonColumn.getDataType(); + switch (dataType) { + case STRUCT: + return createStruct(obj, carbonColumn); + case ARRAY: + return createArray(obj, carbonColumn); + default: + return createWritablePrimitive(obj, carbonColumn); + } + } + + /** + * Create Array Data for Array Datatype + * + * @param obj + * @param carbonColumn + * @return + * @throws IOException + */ + private ArrayWritable createArray(Object obj, CarbonColumn carbonColumn) throws IOException { + if (obj instanceof GenericArrayData) { + Object[] objArray = ((GenericArrayData) obj).array(); + List childCarbonDimensions = null; + CarbonDimension arrayDimension = null; + if (carbonColumn.isDimension() && carbonColumn.getColumnSchema().getNumberOfChild() > 0) { + childCarbonDimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions(); + arrayDimension = childCarbonDimensions.get(0); + } + List array = new ArrayList(); + if (objArray != null) { + for (int i = 0; i < objArray.length; i++) { + Object curObj = objArray[i]; + Writable newObj = createWritableObject(curObj, arrayDimension); + array.add(newObj); + } + } + if (array.size() > 0) { + ArrayWritable subArray = new ArrayWritable(Writable.class, + (Writable[]) array.toArray(new Writable[array.size()])); + return new ArrayWritable(Writable.class, new Writable[] { subArray }); + } + } + return null; + } + + /** + * Create the Struct data for the Struct Datatype + * + * @param obj + * @param carbonColumn + * @return + * @throws IOException + */ + private ArrayWritable createStruct(Object obj, CarbonColumn carbonColumn) throws IOException { + if (obj instanceof GenericInternalRow) { + Object[] objArray = ((GenericInternalRow) obj).values(); + List childCarbonDimensions = null; + if (carbonColumn.isDimension() && carbonColumn.getColumnSchema().getNumberOfChild() > 0) { + childCarbonDimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions(); + } + Writable[] arr = new Writable[objArray.length]; + for (int i = 0; i < objArray.length; i++) { + + arr[i] = createWritableObject(objArray[i], childCarbonDimensions.get(i)); + } + return new ArrayWritable(Writable.class, arr); + } + throw new IOException("DataType not supported in Carbondata"); + } + + /** + * This method will create the Writable Objects for primitives. + * + * @param obj + * @param carbonColumn + * @return + * @throws IOException + */ + private Writable createWritablePrimitive(Object obj, CarbonColumn carbonColumn) + throws IOException { + DataType dataType = carbonColumn.getDataType(); + if (obj == null) { + return null; + } + switch (dataType) { + case NULL: + return null; + case DOUBLE: + return new DoubleWritable((double) obj); + case INT: + return new IntWritable((int) obj); + case LONG: + return new LongWritable((long) obj); + case SHORT: + return new ShortWritable((Short) obj); + case DATE: + return new DateWritable(new Date((Integer) obj)); + case TIMESTAMP: + return new TimestampWritable(new Timestamp((long) obj)); + case STRING: + return new Text(obj.toString()); + case DECIMAL: + return new HiveDecimalWritable( + HiveDecimal.create(new java.math.BigDecimal(obj.toString()))); + } + throw new IOException("Unknown primitive : " + dataType.getName()); + } + + /** + * If we need to use the same Writable[] then we can use this method + * + * @param writable + * @param obj + * @param carbonColumn + * @throws IOException + */ + private void setPrimitive(Writable writable, Object obj, CarbonColumn carbonColumn) + throws IOException { + DataType dataType = carbonColumn.getDataType(); + if (obj == null) { + writable.write(null); + } + switch (dataType) { + case DOUBLE: + ((DoubleWritable) writable).set((double) obj); + break; + case INT: + ((IntWritable) writable).set((int) obj); + break; + case LONG: + ((LongWritable) writable).set((long) obj); + break; + case SHORT: + ((ShortWritable) writable).set((short) obj); + break; + case DATE: + ((DateWritable) writable).set(new Date((Long) obj)); + break; + case TIMESTAMP: + ((TimestampWritable) writable).set(new Timestamp((long) obj)); + break; + case STRING: + ((Text) writable).set(obj.toString()); + break; + case DECIMAL: + ((HiveDecimalWritable) writable) + .set(HiveDecimal.create(new java.math.BigDecimal(obj.toString()))); + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java index bfe4d27..b922295 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java @@ -113,8 +113,7 @@ public class CarbonHiveInputSplit extends FileSplit } public static CarbonHiveInputSplit from(String segmentId, FileSplit split, - ColumnarFormatVersion version) - throws IOException { + ColumnarFormatVersion version) throws IOException { return new CarbonHiveInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(), split.getLocations(), version); } @@ -151,8 +150,7 @@ public class CarbonHiveInputSplit extends FileSplit return segmentId; } - @Override - public void readFields(DataInput in) throws IOException { + @Override public void readFields(DataInput in) throws IOException { super.readFields(in); this.segmentId = in.readUTF(); this.version = ColumnarFormatVersion.valueOf(in.readShort()); @@ -162,10 +160,10 @@ public class CarbonHiveInputSplit extends FileSplit for (int i = 0; i < numInvalidSegment; i++) { invalidSegments.add(in.readUTF()); } + this.numberOfBlocklets = in.readInt(); } - @Override - public void write(DataOutput out) throws IOException { + @Override public void write(DataOutput out) throws IOException { super.write(out); out.writeUTF(segmentId); out.writeShort(version.number()); @@ -174,6 +172,7 @@ public class CarbonHiveInputSplit extends FileSplit for (String invalidSegment : invalidSegments) { out.writeUTF(invalidSegment); } + out.writeInt(numberOfBlocklets); } public List getInvalidSegments() { @@ -213,8 +212,7 @@ public class CarbonHiveInputSplit extends FileSplit return bucketId; } - @Override - public int compareTo(Distributable o) { + @Override public int compareTo(Distributable o) { if (o == null) { return -1; } @@ -264,18 +262,15 @@ public class CarbonHiveInputSplit extends FileSplit return 0; } - @Override - public String getBlockPath() { + @Override public String getBlockPath() { return getPath().getName(); } - @Override - public List getMatchedBlocklets() { + @Override public List getMatchedBlocklets() { return null; } - @Override - public boolean fullScan() { + @Override public boolean fullScan() { return true; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java index e4df02e..2a92185 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java @@ -62,6 +62,8 @@ class CarbonHiveRecordReader extends CarbonRecordReader private ArrayWritable valueObj = null; private CarbonObjectInspector objInspector; + private long recordReaderCounter = 0; + private int[] columnIds; public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport readSupport, InputSplit inputSplit, JobConf jobConf) throws IOException { @@ -88,17 +90,12 @@ class CarbonHiveRecordReader extends CarbonRecordReader } catch (QueryExecutionException e) { throw new IOException(e.getMessage(), e.getCause()); } - if (valueObj == null) { - valueObj = - new ArrayWritable(Writable.class, new Writable[queryModel.getProjectionColumns().length]); - } - final TypeInfo rowTypeInfo; final List columnNames; List columnTypes; // Get column names and sort order final String colIds = conf.get("hive.io.file.readcolumn.ids"); - final String columnNameProperty = conf.get("hive.io.file.readcolumn.names"); + final String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS); final String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES); if (columnNameProperty.length() == 0) { @@ -111,47 +108,39 @@ class CarbonHiveRecordReader extends CarbonRecordReader } else { columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); } + + if (valueObj == null) { + valueObj = new ArrayWritable(Writable.class, new Writable[columnTypes.size()]); + } + if (!colIds.equals("")) { String[] arraySelectedColId = colIds.split(","); List reqColTypes = new ArrayList(); - - for (String anArrayColId : arraySelectedColId) { - reqColTypes.add(columnTypes.get(Integer.parseInt(anArrayColId))); + columnIds = new int[arraySelectedColId.length]; + int columnId = 0; + for (int j = 0; j < arraySelectedColId.length; j++) { + columnId = Integer.parseInt(arraySelectedColId[j]); + columnIds[j] = columnId; } - // Create row related objects - rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, reqColTypes); - this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo); - } else { - rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); - this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo); } + + rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo); } @Override public boolean next(Void aVoid, ArrayWritable value) throws IOException { if (carbonIterator.hasNext()) { Object obj = readSupport.readRow(carbonIterator.next()); - ArrayWritable tmpValue; - try { - tmpValue = createArrayWritable(obj); - } catch (SerDeException se) { - throw new IOException(se.getMessage(), se.getCause()); - } - - if (value != tmpValue) { - final Writable[] arrValue = value.get(); - final Writable[] arrCurrent = tmpValue.get(); - if (valueObj != null && arrValue.length == arrCurrent.length) { - System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length); - } else { - if (arrValue.length != arrCurrent.length) { - throw new IOException( - "CarbonHiveInput : size of object differs. Value" + " size : " + arrValue.length - + ", Current Object size : " + arrCurrent.length); - } else { - throw new IOException("CarbonHiveInput can not support RecordReaders that" - + " don't return same key & value & value is null"); - } + recordReaderCounter++; + Writable[] objArray = (Writable[]) obj; + Writable[] sysArray = new Writable[value.get().length]; + if (columnIds != null && columnIds.length > 0 && objArray.length == columnIds.length) { + for (int i = 0; i < columnIds.length; i++) { + sysArray[columnIds[i]] = objArray[i]; } + value.set(sysArray); + } else { + value.set(objArray); } return true; } else { @@ -159,10 +148,6 @@ class CarbonHiveRecordReader extends CarbonRecordReader } } - private ArrayWritable createArrayWritable(Object obj) throws SerDeException { - return createStruct(obj, objInspector); - } - @Override public Void createKey() { return null; } @@ -172,7 +157,7 @@ class CarbonHiveRecordReader extends CarbonRecordReader } @Override public long getPos() throws IOException { - return 0; + return recordReaderCounter; } @Override public float getProgress() throws IOException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java index f66f3ed..2980ad3 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java @@ -79,11 +79,9 @@ class CarbonHiveSerDe extends AbstractSerDe { final TypeInfo rowTypeInfo; final List columnNames; - final List reqColNames; final List columnTypes; // Get column names and sort order assert configuration != null; - final String colIds = configuration.get("hive.io.file.readcolumn.ids"); final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); @@ -98,29 +96,17 @@ class CarbonHiveSerDe extends AbstractSerDe { } else { columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); } - if (colIds != null && !colIds.equals("")) { - reqColNames = new ArrayList(); - - String[] arraySelectedColId = colIds.split(","); - List reqColTypes = new ArrayList(); - for (String anArrayColId : arraySelectedColId) { - reqColNames.add(columnNames.get(Integer.parseInt(anArrayColId))); - reqColTypes.add(columnTypes.get(Integer.parseInt(anArrayColId))); - } - // Create row related objects - rowTypeInfo = TypeInfoFactory.getStructTypeInfo(reqColNames, reqColTypes); - this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo); - } - else { - // Create row related objects - rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); - this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo); - - // Stats part - serializedSize = 0; - deserializedSize = 0; - status = LAST_OPERATION.UNKNOWN; - } + + + + // Create row related objects + rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo); + + // Stats part + serializedSize = 0; + deserializedSize = 0; + status = LAST_OPERATION.UNKNOWN; } @Override public Class getSerializedClass() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java index 7a1c9db..58f25c9 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java @@ -17,12 +17,12 @@ package org.apache.carbondata.hive; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.scan.model.CarbonQueryPlan; @@ -31,8 +31,11 @@ import org.apache.carbondata.hadoop.CarbonInputFormat; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.io.ArrayWritable; @@ -42,9 +45,11 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.StringUtils; public class MapredCarbonInputFormat extends CarbonInputFormat implements InputFormat, CombineHiveInputFormat.AvoidSplitCombination { + private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table"; @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf); @@ -63,47 +68,64 @@ public class MapredCarbonInputFormat extends CarbonInputFormat @Override public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { - QueryModel queryModel = getQueryModel(jobConf); - CarbonReadSupport readSupport = getReadSupportClass(jobConf); + String path = null; + if (inputSplit instanceof CarbonHiveInputSplit) { + path = ((CarbonHiveInputSplit) inputSplit).getPath().toString(); + } + QueryModel queryModel = getQueryModel(jobConf, path); + CarbonReadSupport readSupport = new CarbonDictionaryDecodeReadSupport<>(); return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, jobConf); } - private QueryModel getQueryModel(Configuration configuration) throws IOException { - CarbonTable carbonTable = getCarbonTable(configuration); + /** + * this method will read the schema from the physical file and populate into CARBON_TABLE + * + * @param configuration + * @throws IOException + */ + private static void populateCarbonTable(Configuration configuration, String paths) + throws IOException { + String dirs = configuration.get(INPUT_DIR, ""); + String[] inputPaths = StringUtils.split(dirs); + String validInputPath = null; + if (inputPaths.length == 0) { + throw new InvalidPathException("No input paths specified in job"); + } else { + if (paths != null) { + for (String inputPath : inputPaths) { + if (paths.startsWith(inputPath)) { + validInputPath = inputPath; + break; + } + } + } + } + AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier.fromTablePath(validInputPath); + // read the schema file to get the absoluteTableIdentifier having the correct table id + // persisted in the schema + CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier); + setCarbonTable(configuration, carbonTable); + } + + private static CarbonTable getCarbonTable(Configuration configuration, String path) + throws IOException { + populateCarbonTable(configuration, path); + // read it from schema file in the store + String carbonTableStr = configuration.get(CARBON_TABLE); + return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); + } + + private QueryModel getQueryModel(Configuration configuration, String path) throws IOException { + CarbonTable carbonTable = getCarbonTable(configuration, path); // getting the table absoluteTableIdentifier from the carbonTable // to avoid unnecessary deserialization StringBuilder colNames = new StringBuilder(); AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); - // query plan includes projection column - String projection = getColumnProjection(configuration); - if (projection == null) { - projection = configuration.get("hive.io.file.readcolumn.names"); - } - if (projection.equals("")) { - List carbonDimensionList = carbonTable.getAllDimensions(); - List carbonMeasureList = carbonTable.getAllMeasures(); - - for (CarbonDimension aCarbonDimensionList : carbonDimensionList) { - colNames = new StringBuilder((colNames + (aCarbonDimensionList.getColName())) + ","); - } - if (carbonMeasureList.size() < 1) { - colNames = new StringBuilder(colNames.substring(0, colNames.lastIndexOf(","))); - } - for (int index = 0; index < carbonMeasureList.size(); index++) { - if (!carbonMeasureList.get(index).getColName().equals("default_dummy_measure")) { - if (index == carbonMeasureList.size() - 1) { - colNames.append(carbonMeasureList.get(index).getColName()); - } else { - colNames = - new StringBuilder((colNames + (carbonMeasureList.get(index).getColName())) + ","); - } - } - } - projection = colNames.toString().trim(); - configuration.set("hive.io.file.readcolumn.names", colNames.toString()); - } + String projection = getProjection(configuration, carbonTable, + identifier.getCarbonTableIdentifier().getTableName()); CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection); QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable); // set the filter to the query model in order to filter blocklet before scan @@ -115,6 +137,45 @@ public class MapredCarbonInputFormat extends CarbonInputFormat return queryModel; } + /** + * Return the Projection for the CarbonQuery. + * + * @param configuration + * @param carbonTable + * @param tableName + * @return + */ + private String getProjection(Configuration configuration, CarbonTable carbonTable, + String tableName) { + // query plan includes projection column + String projection = getColumnProjection(configuration); + if (projection == null) { + projection = configuration.get("hive.io.file.readcolumn.names"); + } + List carbonColumns = carbonTable.getCreateOrderColumn(tableName); + List carbonColumnNames = new ArrayList<>(); + StringBuilder allColumns = new StringBuilder(); + StringBuilder projectionColumns = new StringBuilder(); + for (CarbonColumn column : carbonColumns) { + carbonColumnNames.add(column.getColName()); + allColumns.append(column.getColName() + ","); + } + + if (!projection.equals("")) { + String[] columnNames = projection.split(","); + //verify that the columns parsed by Hive exist in the table + for (String col : columnNames) { + //show columns command will return these data + if (carbonColumnNames.contains(col)) { + projectionColumns.append(col + ","); + } + } + return projectionColumns.substring(0, projectionColumns.lastIndexOf(",")); + } else { + return allColumns.substring(0, allColumns.lastIndexOf(",")); + } + } + @Override public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException { return true; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java b/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java index d8705f8..ae931fb 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java @@ -130,6 +130,7 @@ public class HiveEmbeddedServer2 { conf.set("hive.added.files.path", ""); conf.set("hive.added.archives.path", ""); conf.set("fs.default.name", "file:///"); + conf.set(HiveConf.ConfVars.SUBMITLOCALTASKVIACHILD.varname, "false"); // clear mapred.job.tracker - Hadoop defaults to 'local' if not defined. Hive however expects // this to be set to 'local' - if it's not, it does a remote execution (i.e. no child JVM)