Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4447D200D24 for ; Mon, 18 Sep 2017 12:01:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 42EE51609D8; Mon, 18 Sep 2017 10:01:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 76211160BE1 for ; Mon, 18 Sep 2017 12:01:25 +0200 (CEST) Received: (qmail 58297 invoked by uid 500); 18 Sep 2017 10:01:24 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 57800 invoked by uid 99); 18 Sep 2017 10:01:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Sep 2017 10:01:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 43C9AF5772; Mon, 18 Sep 2017 10:01:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ravipesala@apache.org To: commits@carbondata.apache.org Date: Mon, 18 Sep 2017 10:01:35 -0000 Message-Id: <96bea0d64cf2438190971d2f76123b50@git.apache.org> In-Reply-To: <77011efb048a47a18ed2a856f8edd3fb@git.apache.org> References: <77011efb048a47a18ed2a856f8edd3fb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/51] [abbrv] carbondata git commit: [CARBONDATA-1469] Optimizations for Presto Integration archived-at: Mon, 18 Sep 2017 10:01:29 -0000 [CARBONDATA-1469] Optimizations for Presto Integration This closes #1348 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1551a7c7 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1551a7c7 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1551a7c7 Branch: refs/heads/branch-1.2 Commit: 1551a7c7d4046964a299d01a927b2900a84dc2f3 Parents: 0ab928e Author: Bhavya Authored: Mon Sep 11 16:33:07 2017 +0530 Committer: CHEN LIANG Committed: Tue Sep 12 07:08:37 2017 +0800 ---------------------------------------------------------------------- integration/presto/pom.xml | 536 ++++++++++++------- .../carbondata/presto/PrestoFilterUtil.java | 75 ++- .../readers/DecimalSliceStreamReader.java | 58 +- .../presto/readers/DoubleStreamReader.java | 27 +- .../presto/readers/IntegerStreamReader.java | 28 +- .../presto/readers/LongStreamReader.java | 27 +- .../presto/readers/ShortStreamReader.java | 80 +++ .../presto/readers/StreamReaders.java | 6 + .../presto/readers/TimestampStreamReader.java | 79 +++ 9 files changed, 682 insertions(+), 234 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/pom.xml ---------------------------------------------------------------------- diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml index 562718f..617ce93 100644 --- a/integration/presto/pom.xml +++ b/integration/presto/pom.xml @@ -15,7 +15,9 @@ See the License for the specific language governing permissions and limitations under the License. --> - + 4.0.0 @@ -37,49 +39,223 @@ - org.apache.thrift - libthrift - 0.9.3 - - - org.apache.carbondata - carbondata-core + carbondata-hadoop ${project.version} org.apache.spark - spark-sql_2.10 + spark-network-shuffle_2.11 + + + org.apache.spark + spark-sketch_2.11 + + + org.slf4j + slf4j-log4j12 + + + net.java.dev.jets3t + jets3t + + + javax.servlet + javax.servlet-api + + + org.apache.commons + commons-math3 + + + org.apache.httpcomponents + httpclient + + + org.antlr + antlr4-runtime + + + com.esotericsoftware + minlog + + + org.codehaus.janino + janino + + + net.jpountz.lz4 + lz4 + + + net.sf.py4j + py4j + + + org.spark-project.spark + unused + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-client + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.spark + spark-tags_2.11 + + + org.apache.parquet + parquet-column + + + org.apache.parquet + parquet-hadoop + + + org.glassfish.jersey.core + jersey-client + + + org.glassfish.jersey.core + jersey-common + + + org.glassfish.jersey.core + jersey-server + + + org.glassfish.jersey.containers + jersey-container-servlet + + + org.glassfish.jersey.containers + jersey-container-servlet-core + + + org.glassfish.jersey.containers + jersey-container-servlet-core - - - - - - org.apache.carbondata - carbondata-common - ${project.version} - - - org.apache.carbondata - carbondata-processing - ${project.version} - + + org.apache.curator + curator-recipes + + + org.apache.avro + avro-mapred + + + com.twitter + chill_2.11 + + + io.dropwizard.metrics + metrics-core + + + io.dropwizard.metrics + metrics-jvm + + + io.dropwizard.metrics + metrics-json + + + io.dropwizard.metrics + metrics-graphite + + + com.google.code.findbugs + jsr305 + + + net.java.dev + jets3t + + + org.apache.xbean + xbean-asm5-shaded + org.apache.spark - spark-sql_2.10 + spark-launcher_2.11 + + + org.apache.spark + spark-network-common_2.11 + + + com.ning + compress-lzf + + + org.roaringbitmap + RoaringBitmap + + + com.thoughtworks.paranamer + paranamer + + + org.scala-lang + scalap + + + org.scala-lang + scala-compiler + + + org.scala-lang..modules + parser-combinators_2.11 + + + org.scala-lang..modules + scala-xml_2.11 + + + org.scalatest + scalatest_2.11 + + + org.apache.zookeeper + zookeeper + + + net.sf.py4 + py4j + + + net.razorvine + pyrolite + + + com.clearspring.analytics + stream + + + org.slf4j + jul-to-slf4j + + + org.apache.ivy + ivy + + + oro + oro - org.apache.carbondata - carbondata-hadoop - ${project.version} - - - io.airlift bootstrap 0.144 @@ -87,6 +263,38 @@ org.slf4j + slf4j-jdk14 + + + ch.qos.logback + logback-core + + + org.slf4j + jcl-over-slf4j + + + javax.xml.bind + jaxb-api + + + aopalliance + aopalliance + + + org.weakref + jmxutils + + + cglib + cglib-nodep + + + com.google.code.findbugs + annotations + + + org.slf4j log4j-over-slf4j @@ -98,21 +306,6 @@ 0.144 - - - io.airlift - log - 0.144 - - - - - io.airlift - slice - 0.29 - provided - - io.airlift units @@ -126,19 +319,6 @@ 2.6.0 provided - - - com.google.guava - guava - 18.0 - - - - com.google.inject - guice - 3.0 - - com.facebook.presto @@ -146,152 +326,140 @@ ${presto.version} provided - + + commons-lang + commons-lang + 2.5 + com.facebook.presto.hadoop hadoop-apache2 2.7.3-1 - - org.apache.spark - spark-core_2.11 - 2.1.0 + org.apache.commons + commons-compress + 1.4.1 - com.fasterxml.jackson.core - jackson-databind + org.tukaani + xz + - - org.apache.spark - spark-catalyst_2.11 - 2.1.0 - - - org.apache.spark - spark-sql_2.11 - 2.1.0 - - - com.fasterxml.jackson.core - jackson-databind - - - - - - - maven-compiler-plugin - - 1.8 - 1.8 - - - - org.apache.maven.plugins - maven-surefire-plugin - 2.18 - - - ${project.build.directory}/surefire-reports - -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m - - true - - false - - + + + + maven-compiler-plugin + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.18 + + + ${project.build.directory}/surefire-reports + -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m + + true + + false + + - - org.apache.maven.plugins - maven-checkstyle-plugin - 2.17 - - true - - + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.17 + + true + + - - org.apache.maven.plugins - maven-enforcer-plugin - 1.4.1 - - true - - + + org.apache.maven.plugins + maven-enforcer-plugin + 1.4.1 + + true + + - - com.ning.maven.plugins - maven-dependency-versions-check-plugin - - true - false - - + + com.ning.maven.plugins + maven-dependency-versions-check-plugin + + true + false + + - - org.apache.maven.plugins - maven-dependency-plugin - - false - - + + org.apache.maven.plugins + maven-dependency-plugin + + false + + - - com.ning.maven.plugins - maven-duplicate-finder-plugin - - true - - + + com.ning.maven.plugins + maven-duplicate-finder-plugin + + true + + - - io.takari.maven.plugins - presto-maven-plugin - 0.1.12 - true - + + io.takari.maven.plugins + presto-maven-plugin + 0.1.12 + true + - - pl.project13.maven - git-commit-id-plugin - - true - - - - org.scala-tools - maven-scala-plugin - 2.15.2 - - - compile - - compile - - compile - - - testCompile - - testCompile - - test - - - process-resources - - compile - - - - - - + + pl.project13.maven + git-commit-id-plugin + + true + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + + compile + + compile + + compile + + + testCompile + + testCompile + + test + + + process-resources + + compile + + + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java index 9a5a5cb..a958e63 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java @@ -17,6 +17,8 @@ package org.apache.carbondata.presto; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; @@ -75,8 +77,8 @@ public class PrestoFilterUtil { else if (colType == VarcharType.VARCHAR) return DataType.STRING; else if (colType == DateType.DATE) return DataType.DATE; else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP; - else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(), - carbondataColumnHandle.getScale())) return DataType.DECIMAL; + else if (colType.equals(DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(), + carbondataColumnHandle.getScale()))) return DataType.DECIMAL; else return DataType.STRING; } @@ -104,13 +106,12 @@ public class PrestoFilterUtil { checkArgument(domain.getType().isOrderable(), "Domain type must be orderable"); List singleValues = new ArrayList<>(); - List disjuncts = new ArrayList<>(); + Map> valueExpressionMap = new HashMap<>(); for (Range range : domain.getValues().getRanges().getOrderedRanges()) { if (range.isSingleValue()) { Object value = ConvertDataByType(range.getLow().getValue(), type); singleValues.add(value); } else { - List rangeConjuncts = new ArrayList<>(); if (!range.getLow().isLowerUnbounded()) { Object value = ConvertDataByType(range.getLow().getValue(), type); switch (range.getLow().getBound()) { @@ -120,14 +121,20 @@ public class PrestoFilterUtil { } else { GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype)); - rangeConjuncts.add(greater); + if(valueExpressionMap.get(value) == null) { + valueExpressionMap.put(value, new ArrayList<>()); + } + valueExpressionMap.get(value).add(greater); } break; case EXACTLY: GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype)); - rangeConjuncts.add(greater); + if(valueExpressionMap.get(value) == null) { + valueExpressionMap.put(value, new ArrayList<>()); + } + valueExpressionMap.get(value).add(greater); break; case BELOW: throw new IllegalArgumentException("Low marker should never use BELOW bound"); @@ -143,18 +150,23 @@ public class PrestoFilterUtil { case EXACTLY: LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype)); - rangeConjuncts.add(less); + if(valueExpressionMap.get(value) == null) { + valueExpressionMap.put(value, new ArrayList<>()); + } + valueExpressionMap.get(value).add(less); break; case BELOW: LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype)); - rangeConjuncts.add(less2); + if(valueExpressionMap.get(value) == null) { + valueExpressionMap.put(value, new ArrayList<>()); + } + valueExpressionMap.get(value).add(less2); break; default: throw new AssertionError("Unhandled bound: " + range.getHigh().getBound()); } } - disjuncts.addAll(rangeConjuncts); } } if (singleValues.size() == 1) { @@ -174,19 +186,34 @@ public class PrestoFilterUtil { .map((a) -> new LiteralExpression(ConvertDataByType(a, type), coltype)) .collect(Collectors.toList()); candidates = new ListExpression(exs); - filters.add(new InExpression(colExpression, candidates)); - } else if (disjuncts.size() > 0) { - if (disjuncts.size() > 1) { - Expression finalFilters = new OrExpression(disjuncts.get(0), disjuncts.get(1)); - if (disjuncts.size() > 2) { - for (int i = 2; i < disjuncts.size(); i++) { - filters.add(new AndExpression(finalFilters, disjuncts.get(i))); + } else if (valueExpressionMap.size() > 0) { + List valuefilters = new ArrayList<>(); + Expression finalFilters = null; + List expressions; + for (Map.Entry> entry : valueExpressionMap.entrySet()) { + expressions = valueExpressionMap.get(entry.getKey()); + if (expressions.size() == 1) { + finalFilters = expressions.get(0); + } else if (expressions.size() >= 2) { + finalFilters = new OrExpression(expressions.get(0), expressions.get(1)); + for (int i = 2; i < expressions.size(); i++) { + finalFilters = new OrExpression(finalFilters, expressions.get(i)); } - } else { - filters.add(finalFilters); } - } else if (disjuncts.size() == 1) filters.add(disjuncts.get(0)); + valuefilters.add(finalFilters); + } + + if(valuefilters.size() == 1){ + finalFilters = valuefilters.get(0); + } else if (valuefilters.size() >= 2) { + finalFilters = new AndExpression(valuefilters.get(0), valuefilters.get(1)); + for (int i = 2; i < valuefilters.size() ; i++) { + finalFilters = new AndExpression(finalFilters, valuefilters.get(i)); + } + } + + filters.add(finalFilters); } } @@ -196,7 +223,7 @@ public class PrestoFilterUtil { finalFilters = new AndExpression(tmp.get(0), tmp.get(1)); if (tmp.size() > 2) { for (int i = 2; i < tmp.size(); i++) { - finalFilters = new OrExpression(finalFilters, tmp.get(i)); + finalFilters = new AndExpression(finalFilters, tmp.get(i)); } } } else if (tmp.size() == 1) finalFilters = tmp.get(0); @@ -223,6 +250,14 @@ public class PrestoFilterUtil { Date date = c.getTime(); return date.getTime() * 1000; } + else if (type instanceof DecimalType) { + if(rawdata instanceof Double) { + return new BigDecimal((Double) rawdata); + } else if (rawdata instanceof Long) { + return new BigDecimal(new BigInteger(String.valueOf(rawdata)), + ((DecimalType) type).getScale()); + } + } return rawdata; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java index 89d4e60..6612ab0 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java @@ -66,20 +66,17 @@ public class DecimalSliceStreamReader extends AbstractStreamReader { int scale = ((DecimalType)type).getScale(); int precision = ((DecimalType)type).getPrecision(); if (columnVector != null) { - for(int i = 0; i < numberOfRows ; i++ ){ - if(columnVector.isNullAt(i)) { - builder.appendNull(); + if(columnVector.anyNullsSet()) + { + handleNullInVector(type, numberOfRows, builder, scale, precision); + } else { + if(isShortDecimal(type)) { + populateShortDecimalVector(type, numberOfRows, builder, scale, precision); } else { - Slice slice = - getSlice(columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(), type); - if (isShortDecimal(type)) { - type.writeLong(builder, parseLong((DecimalType) type, slice, 0, slice.length())); - } else { - type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length())); - } + populateLongDecimalVector(type, numberOfRows, builder, scale, precision); } } - } + } } else { if (streamData != null) { @@ -182,4 +179,43 @@ public class DecimalSliceStreamReader extends AbstractStreamReader { return decimal; } + + private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder, int scale, + int precision) { + for (int i = 0; i < numberOfRows; i++) { + if (columnVector.isNullAt(i)) { + builder.appendNull(); + } else { + if (isShortDecimal(type)) { + long rescaledDecimal = Decimals + .rescale(columnVector.getDecimal(i, precision, scale).toLong(), + columnVector.getDecimal(i, precision, scale).scale(), scale); + type.writeLong(builder, rescaledDecimal); + } else { + Slice slice = + getSlice(columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(), type); + type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length())); + } + } + } + } + + private void populateShortDecimalVector(Type type, int numberOfRows, BlockBuilder builder, + int scale, int precision) { + for (int i = 0; i < numberOfRows; i++) { + BigDecimal decimalValue = columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(); + long rescaledDecimal = Decimals.rescale(decimalValue.unscaledValue().longValue(), + decimalValue.scale(), scale); + type.writeLong(builder, rescaledDecimal); + } + } + + private void populateLongDecimalVector(Type type, int numberOfRows, BlockBuilder builder, + int scale, int precision) { + for (int i = 0; i < numberOfRows; i++) { + Slice slice = getSlice(columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(), type); + type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length())); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java index cacf5ce..2b90a8d 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java @@ -47,12 +47,11 @@ public class DoubleStreamReader extends AbstractStreamReader { numberOfRows = batchSize; builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); if (columnVector != null) { - for (int i = 0; i < numberOfRows; i++) { - if (columnVector.isNullAt(i)) { - builder.appendNull(); - } else { - type.writeDouble(builder, columnVector.getDouble(i)); - } + if(columnVector.anyNullsSet()) { + handleNullInVector(type, numberOfRows, builder); + } + else { + populateVector(type, numberOfRows, builder); } } } else { @@ -68,4 +67,20 @@ public class DoubleStreamReader extends AbstractStreamReader { return builder.build(); } + private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + if (columnVector.isNullAt(i)) { + builder.appendNull(); + } else { + type.writeDouble(builder, columnVector.getDouble(i)); + } + } + } + + private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + type.writeDouble(builder, columnVector.getDouble(i)); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java index 13280c8..ccc0192 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java @@ -41,13 +41,11 @@ public class IntegerStreamReader extends AbstractStreamReader { numberOfRows = batchSize; builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); if (columnVector != null) { - for(int i = 0; i < numberOfRows ; i++ ){ - if(columnVector.isNullAt(i)){ - builder.appendNull(); - } else { - type.writeLong(builder, ((Integer)columnVector.getInt(i)).longValue()); - } - + if(columnVector.anyNullsSet()) { + handleNullInVector(type, numberOfRows, builder); + } + else { + populateVector(type, numberOfRows, builder); } } @@ -64,4 +62,20 @@ public class IntegerStreamReader extends AbstractStreamReader { return builder.build(); } + private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + if (columnVector.isNullAt(i)) { + builder.appendNull(); + } else { + type.writeLong(builder, ((Integer) columnVector.getInt(i)).longValue()); + } + } + } + + private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + type.writeLong(builder, columnVector.getInt(i)); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java index 9d602a6..5081b32 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java @@ -37,12 +37,11 @@ public class LongStreamReader extends AbstractStreamReader { numberOfRows = batchSize; builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); if (columnVector != null) { - for (int i = 0; i < numberOfRows; i++) { - if (columnVector.isNullAt(i)) { - builder.appendNull(); - } else { - type.writeLong(builder, columnVector.getLong(i)); - } + if(columnVector.anyNullsSet()) { + handleNullInVector(type, numberOfRows, builder); + } + else { + populateVector(type, numberOfRows, builder); } } @@ -59,4 +58,20 @@ public class LongStreamReader extends AbstractStreamReader { return builder.build(); } + private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + if (columnVector.isNullAt(i)) { + builder.appendNull(); + } else { + type.writeLong(builder, columnVector.getLong(i)); + } + } + } + + private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + type.writeLong(builder, columnVector.getLong(i)); + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java new file mode 100644 index 0000000..59d8e96 --- /dev/null +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.readers; + +import java.io.IOException; + +import com.facebook.presto.spi.block.Block; +import com.facebook.presto.spi.block.BlockBuilder; +import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.type.Type; + +public class ShortStreamReader extends AbstractStreamReader { + + + public ShortStreamReader( ) { + + } + + public Block readBlock(Type type) + throws IOException + { + int numberOfRows = 0; + BlockBuilder builder = null; + if(isVectorReader) { + numberOfRows = batchSize; + builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); + if (columnVector != null) { + if(columnVector.anyNullsSet()) { + handleNullInVector(type, numberOfRows, builder); + } + else { + populateVector(type, numberOfRows, builder); + } + } + + } else { + numberOfRows = streamData.length; + builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); + if (streamData != null) { + for(int i = 0; i < numberOfRows ; i++ ){ + type.writeLong(builder,(Short)streamData[i]); + } + } + } + + return builder.build(); + } + + private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + if (columnVector.isNullAt(i)) { + builder.appendNull(); + } else { + type.writeLong(builder, (columnVector.getShort(i))); + } + } + } + + private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + type.writeLong(builder, (columnVector.getShort(i))); + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java index abd8787..86f863a 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java @@ -23,6 +23,8 @@ import com.facebook.presto.spi.block.SliceArrayBlock; import com.facebook.presto.spi.type.DateType; import com.facebook.presto.spi.type.DecimalType; import com.facebook.presto.spi.type.IntegerType; +import com.facebook.presto.spi.type.SmallintType; +import com.facebook.presto.spi.type.TimestampType; import com.facebook.presto.spi.type.Type; import io.airlift.slice.Slice; @@ -44,6 +46,10 @@ public final class StreamReaders { return new IntegerStreamReader(); } else if (type instanceof DecimalType) { return new DecimalSliceStreamReader(); + } else if (type instanceof SmallintType) { + return new ShortStreamReader(); + } else if (type instanceof TimestampType) { + return new TimestampStreamReader(); } return new LongStreamReader(); } else if (javaType == double.class) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1551a7c7/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java new file mode 100644 index 0000000..8ea3efb --- /dev/null +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.readers; + +import java.io.IOException; + +import com.facebook.presto.spi.block.Block; +import com.facebook.presto.spi.block.BlockBuilder; +import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.type.Type; + +public class TimestampStreamReader extends AbstractStreamReader { + + private int TIMESTAMP_DIVISOR = 1000; + + public TimestampStreamReader() { + + } + + public Block readBlock(Type type) throws IOException { + int numberOfRows = 0; + BlockBuilder builder = null; + if (isVectorReader) { + numberOfRows = batchSize; + builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); + if (columnVector != null) { + if(columnVector.anyNullsSet()) { + handleNullInVector(type, numberOfRows, builder); + } + else { + populateVector(type, numberOfRows, builder); + } + } + + } else { + numberOfRows = streamData.length; + builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows); + if (streamData != null) { + for (int i = 0; i < numberOfRows; i++) { + type.writeLong(builder, (Long) streamData[i]); + } + } + } + + return builder.build(); + } + + private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + if (columnVector.isNullAt(i)) { + builder.appendNull(); + } else { + type.writeLong(builder, columnVector.getLong(i)/ TIMESTAMP_DIVISOR); + } + } + } + + private void populateVector(Type type, int numberOfRows, BlockBuilder builder) { + for (int i = 0; i < numberOfRows; i++) { + type.writeLong(builder, columnVector.getLong(i)/TIMESTAMP_DIVISOR); + } + } + +}