carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [1/2] incubator-carbondata git commit: Added support for DecimalType and Timestamp in Presto Integration with Spark2.1
Date Mon, 17 Apr 2017 15:20:23 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/presto ca0ecbd50 -> 1d345efe8


Added support for DecimalType and Timestamp in Presto Integration with Spark2.1

Added support for DecimalType and Fixed Date and time issues

Fixed the Apache License and removed comments

Corrected indentation and spaces


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ddf06f15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ddf06f15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ddf06f15

Branch: refs/heads/presto
Commit: ddf06f15541624a06287e211b71f0299eacb5c3e
Parents: ca0ecbd
Author: Bhavya <bhavya@knoldus.com>
Authored: Mon Apr 17 11:51:44 2017 +0530
Committer: Bhavya <bhavya@knoldus.com>
Committed: Mon Apr 17 12:17:36 2017 +0530

----------------------------------------------------------------------
 .../presto/CarbondataColumnHandle.java          |  25 ++-
 .../carbondata/presto/CarbondataConnector.java  |  10 +-
 .../presto/CarbondataConnectorFactory.java      |   5 +-
 .../carbondata/presto/CarbondataMetadata.java   |  23 +--
 .../carbondata/presto/CarbondataModule.java     |   5 +
 .../carbondata/presto/CarbondataPageSource.java | 178 +++++++++++++++++++
 .../presto/CarbondataPageSourceProvider.java    |  50 ++++++
 .../presto/CarbondataRecordCursor.java          |  60 ++++++-
 .../carbondata/presto/CarbondataRecordSet.java  |   8 +-
 .../presto/CarbondataRecordSetProvider.java     |  12 +-
 10 files changed, 349 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ddf06f15/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
index b9152b5..4a9b7ed 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -46,6 +46,14 @@ public class CarbondataColumnHandle implements ColumnHandle {
   private final String columnUniqueId;
   private final boolean isInvertedIndex;
 
+  /**
+   * Used when this column contains decimal data.
+   */
+  private int scale;
+
+  private int precision;
+
+
   public boolean isMeasure() {
     return isMeasure;
   }
@@ -76,7 +84,9 @@ public class CarbondataColumnHandle implements ColumnHandle {
       @JsonProperty("isMeasure") boolean isMeasure,
       @JsonProperty("columnGroupId") int columnGroupId,
       @JsonProperty("columnUniqueId") String columnUniqueId,
-      @JsonProperty("isInvertedIndex") boolean isInvertedIndex) {
+      @JsonProperty("isInvertedIndex") boolean isInvertedIndex,
+      @JsonProperty("precision") int precision,
+      @JsonProperty("scale") int scale) {
     this.connectorId = requireNonNull(connectorId, "connectorId is null");
     this.columnName = requireNonNull(columnName, "columnName is null");
     this.columnType = requireNonNull(columnType, "columnType is null");
@@ -89,6 +99,8 @@ public class CarbondataColumnHandle implements ColumnHandle {
     this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
     this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId
is null");
     this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+    this.precision = precision;
+    this.scale = scale;
   }
 
   @JsonProperty public String getConnectorId() {
@@ -132,4 +144,15 @@ public class CarbondataColumnHandle implements ColumnHandle {
     return toStringHelper(this).add("connectorId", connectorId).add("columnName", columnName)
         .add("columnType", columnType).add("ordinalPosition", ordinalPosition).toString();
   }
+
+  @JsonProperty public int getScale() {
+    return scale;
+  }
+
+  @JsonProperty public int getPrecision() {
+    return precision;
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ddf06f15/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
index 0f1dbda..406ed93 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
@@ -35,15 +35,17 @@ public class CarbondataConnector implements Connector {
   private final ConnectorSplitManager splitManager;
   private final ConnectorRecordSetProvider recordSetProvider;
   private final ClassLoader classLoader;
+  private final ConnectorPageSourceProvider pageSourceProvider;
 
   public CarbondataConnector(LifeCycleManager lifeCycleManager, CarbondataMetadata metadata,
       ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider,
-      ClassLoader classLoader) {
+      ClassLoader classLoader, ConnectorPageSourceProvider pageSourceProvider) {
     this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
     this.metadata = requireNonNull(metadata, "metadata is null");
     this.splitManager = requireNonNull(splitManager, "splitManager is null");
     this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
     this.classLoader = requireNonNull(classLoader, "classLoader is null");
+    this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
   }
 
   @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel,
@@ -65,6 +67,12 @@ public class CarbondataConnector implements Connector {
     return recordSetProvider;
   }
 
+  @Override
+  public ConnectorPageSourceProvider getPageSourceProvider()
+  {
+    return pageSourceProvider;
+  }
+
   @Override public final void shutdown() {
     try {
       lifeCycleManager.stop();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ddf06f15/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index d1c8082..d97f19e 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.presto;
 import com.facebook.presto.spi.ConnectorHandleResolver;
 import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
 import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
 import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
 import com.google.common.base.Throwables;
 import com.google.inject.Injector;
@@ -70,10 +71,12 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
       ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
       ConnectorRecordSetProvider connectorRecordSet =
           injector.getInstance(ConnectorRecordSetProvider.class);
+       ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
 
       return new CarbondataConnector(lifeCycleManager, metadata,
           new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
-          classLoader
+          classLoader,
+          new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader)
       );
     } catch (Exception e) {
       throw Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ddf06f15/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index f2d594a..7701490 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -123,9 +123,8 @@ public class CarbondataMetadata implements ConnectorMetadata {
     List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
     for (CarbonColumn col : carbonColumns) {
       //show columns command will return these data
-      Type columnType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
-      ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(),
-          columnType);
+      Type columnType = CarbondataType2SpiMapper(col.getColumnSchema());
+      ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(),
columnType);
       columnsMetaList.add(columnMeta);
     }
 
@@ -162,21 +161,21 @@ public class CarbondataMetadata implements ConnectorMetadata {
       column.getNumberOfChild();
       column.getListOfChildDimensions();
 
-      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      Type spiType = CarbondataType2SpiMapper(cs);
       columnHandles.put(cs.getColumnName(),
           new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, column.getSchemaOrdinal(),
               column.getKeyOrdinal(), column.getColumnGroupOrdinal(), false, cs.getColumnGroupId(),
-              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
     }
 
     for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
       ColumnSchema cs = measure.getColumnSchema();
 
-      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      Type spiType = CarbondataType2SpiMapper(cs);
       columnHandles.put(cs.getColumnName(),
           new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, cs.getSchemaOrdinal(),
               measure.getOrdinal(), cs.getColumnGroupId(), true, cs.getColumnGroupId(),
-              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
     }
 
     //should i cache it?
@@ -230,7 +229,8 @@ public class CarbondataMetadata implements ConnectorMetadata {
     return getTableMetadata(carbondataTableHandle.getSchemaTableName());
   }
 
-  public static Type CarbondataType2SpiMapper(DataType colType) {
+  public static Type CarbondataType2SpiMapper(ColumnSchema columnSchema) {
+    DataType colType = columnSchema.getDataType();
     switch (colType) {
       case BOOLEAN:
         return BooleanType.BOOLEAN;
@@ -243,9 +243,12 @@ public class CarbondataMetadata implements ConnectorMetadata {
       case FLOAT:
       case DOUBLE:
         return DoubleType.DOUBLE;
-
       case DECIMAL:
-        return DecimalType.createDecimalType();
+        if(columnSchema.getPrecision() > 0){
+          return DecimalType.createDecimalType(columnSchema.getPrecision(), columnSchema.getScale());
+        } else {
+          return DecimalType.createDecimalType();
+        }
       case STRING:
         return VarcharType.VARCHAR;
       case DATE:

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ddf06f15/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
index 0baa64a..1d8b2b2 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.presto;
 
 import org.apache.carbondata.presto.impl.CarbonTableConfig;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
+
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
 import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
 import com.facebook.presto.spi.connector.ConnectorSplitManager;
 import com.facebook.presto.spi.type.Type;
@@ -55,7 +57,10 @@ public class CarbondataModule implements Module {
     binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
     binder.bind(ConnectorRecordSetProvider.class).to(CarbondataRecordSetProvider.class)
         .in(Scopes.SINGLETON);
+    binder.bind(ConnectorPageSourceProvider.class).to(CarbondataPageSourceProvider.class)
+        .in(Scopes.SINGLETON);
     binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
+    binder.bind(CarbondataRecordSetProvider.class).in(Scopes.SINGLETON);
     configBinder(binder).bindConfig(CarbonTableConfig.class);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ddf06f15/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
new file mode 100644
index 0000000..7c50c66
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.Page;
+import com.facebook.presto.spi.PageBuilder;
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.Type;
+import io.airlift.slice.Slice;
+
+import static com.facebook.presto.spi.type.Decimals.encodeUnscaledValue;
+import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.math.RoundingMode.HALF_UP;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Carbondata Page Source class for custom Carbondata RecordSet Iteration.
+ */
+public class CarbondataPageSource implements ConnectorPageSource {
+
+  private static final int ROWS_PER_REQUEST = 4096;
+  private final RecordCursor cursor;
+  private final List<Type> types;
+  private final PageBuilder pageBuilder;
+  private boolean closed;
+  private final char[] buffer = new char[100];
+
+  public CarbondataPageSource(RecordSet recordSet)
+  {
+    this(requireNonNull(recordSet, "recordSet is null").getColumnTypes(), recordSet.cursor());
+  }
+
+  public CarbondataPageSource(List<Type> types, RecordCursor cursor)
+  {
+    this.cursor = requireNonNull(cursor, "cursor is null");
+    this.types = unmodifiableList(new ArrayList<>(requireNonNull(types, "types is null")));
+    this.pageBuilder = new PageBuilder(this.types);
+  }
+
+  public RecordCursor getCursor()
+  {
+    return cursor;
+  }
+
+  @Override public long getTotalBytes() {
+    return cursor.getTotalBytes();
+  }
+
+  @Override public long getCompletedBytes() {
+    return cursor.getCompletedBytes();
+  }
+
+  @Override public long getReadTimeNanos() {
+    return cursor.getReadTimeNanos();
+  }
+
+  @Override public boolean isFinished() {
+    return closed && pageBuilder.isEmpty();
+  }
+
+  @Override public Page getNextPage() {
+    if (!closed) {
+      int i;
+      for (i = 0; i < ROWS_PER_REQUEST; i++) {
+        if (pageBuilder.isFull()) {
+          break;
+        }
+        if (!cursor.advanceNextPosition()) {
+          closed = true;
+          break;
+        }
+
+        pageBuilder.declarePosition();
+        for (int column = 0; column < types.size(); column++) {
+          BlockBuilder output = pageBuilder.getBlockBuilder(column);
+          if (cursor.isNull(column)) {
+            output.appendNull();
+          } else {
+            Type type = types.get(column);
+            Class<?> javaType = type.getJavaType();
+            if (javaType == boolean.class) {
+              type.writeBoolean(output, cursor.getBoolean(column));
+            } else if (javaType == long.class) {
+              type.writeLong(output, cursor.getLong(column));
+            } else if (javaType == double.class) {
+              type.writeDouble(output, cursor.getDouble(column));
+            } else if (javaType == Slice.class) {
+              Slice slice = cursor.getSlice(column);
+              if(type instanceof  DecimalType)
+              {
+                if (isShortDecimal(type)) {
+                  type.writeLong(output, parseLong((DecimalType) type, slice, 0, slice.length()));
+                } else {
+                  type.writeSlice(output, parseSlice((DecimalType) type, slice, 0, slice.length()));
+                }
+              } else {
+                type.writeSlice(output, slice, 0, slice.length());
+              }
+            } else {
+              type.writeObject(output, cursor.getObject(column));
+            }
+          }
+        }
+      }
+    }
+
+    // only return a page if the buffer is full or we are finishing
+    if (pageBuilder.isEmpty() || (!closed && !pageBuilder.isFull())) {
+      return null;
+    }
+    Page page = pageBuilder.build();
+    pageBuilder.reset();
+    return page;
+ }
+
+  @Override public long getSystemMemoryUsage() {
+    return cursor.getSystemMemoryUsage() + pageBuilder.getSizeInBytes();
+  }
+
+  @Override public void close() throws IOException {
+    closed = true;
+    cursor.close();
+
+  }
+
+  private long parseLong(DecimalType type, Slice slice, int offset, int length)
+  {
+    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+    return decimal.unscaledValue().longValue();
+  }
+
+
+  private Slice parseSlice(DecimalType type, Slice slice, int offset, int length)
+  {
+    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+    return encodeUnscaledValue(decimal.unscaledValue());
+  }
+
+  private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length)
+  {
+    checkArgument(length < buffer.length);
+    for (int i = 0; i < length; i++) {
+      buffer[i] = (char) slice.getByte(offset + i);
+    }
+    BigDecimal decimal = new BigDecimal(buffer, 0, length);
+    checkState(decimal.scale() <= type.getScale(), "Read decimal value scale larger than
column scale");
+    decimal = decimal.setScale(type.getScale(), HALF_UP);
+    checkState(decimal.precision() <= type.getPrecision(), "Read decimal precision larger
than column precision");
+    return decimal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ddf06f15/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
new file mode 100644
index 0000000..46d8611
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.inject.Inject;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Provider Class for Carbondata Page Source class.
+ */
+public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider {
+
+  private CarbondataRecordSetProvider carbondataRecordSetProvider;
+
+  @Inject
+  public CarbondataPageSourceProvider(CarbondataRecordSetProvider carbondataRecordSetProvider)
+  {
+    this.carbondataRecordSetProvider = requireNonNull(carbondataRecordSetProvider, "recordSetProvider
is null");
+  }
+
+  @Override
+  public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle,
+      ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) {
+    return new CarbondataPageSource(carbondataRecordSetProvider.getRecordSet(transactionHandle,
session, split, columns));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ddf06f15/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
index ad47f75..2e97dc0 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -18,7 +18,14 @@
 package org.apache.carbondata.presto;
 
 import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.Decimals;
+import com.facebook.presto.spi.type.TimestampType;
 import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+
 import com.google.common.base.Strings;
 import io.airlift.log.Logger;
 import io.airlift.slice.Slice;
@@ -26,16 +33,22 @@ import io.airlift.slice.Slices;
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+
 
 import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
+import static com.facebook.presto.spi.type.Decimals.rescale;
 import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
 import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static io.airlift.slice.Slices.utf8Slice;
 
 public class CarbondataRecordCursor implements RecordCursor {
 
@@ -114,8 +127,10 @@ public class CarbondataRecordCursor implements RecordCursor {
 
   @Override public long getLong(int field) {
     String timeStr = getFieldValue(field);
-    Long milliSec = 0L;
-
+    Type actual = getType(field);
+    if(actual instanceof TimestampType){
+      return new Timestamp(Long.parseLong(timeStr)).getTime()/1000;
+    }
     //suppose the
     return Math.round(Double.parseDouble(getFieldValue(field)));
   }
@@ -126,8 +141,41 @@ public class CarbondataRecordCursor implements RecordCursor {
   }
 
   @Override public Slice getSlice(int field) {
-    checkFieldType(field, VARCHAR);
-    return Slices.utf8Slice(getFieldValue(field));
+    Type decimalType = getType(field);
+    if (decimalType instanceof DecimalType) {
+      DecimalType actual = (DecimalType) decimalType;
+      CarbondataColumnHandle carbondataColumnHandle = columnHandles.get(field);
+      if(carbondataColumnHandle.getPrecision() > 0 ) {
+        checkFieldType(field, DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
carbondataColumnHandle.getScale()));
+      } else {
+        checkFieldType(field, DecimalType.createDecimalType());
+      }
+      String fieldValue = getFieldValue(field);
+      BigDecimal bigDecimalValue = new BigDecimal(fieldValue);
+      if (isShortDecimal(decimalType)) {
+        return utf8Slice(Decimals.toString(bigDecimalValue.longValue(), actual.getScale()));
+      } else {
+        if (bigDecimalValue.scale() > actual.getScale()) {
+          BigInteger unscaledDecimal =
+              rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(),
+                  bigDecimalValue.scale());
+          Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
+          return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
+          //return decimalSlice;
+        } else {
+          BigInteger unscaledDecimal =
+              rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(), actual.getScale());
+          Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
+          return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
+          //return decimalSlice;
+
+        }
+
+      }
+    } else {
+      checkFieldType(field, VARCHAR);
+      return utf8Slice(getFieldValue(field));
+    }
   }
 
   @Override public Object getObject(int field) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ddf06f15/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index d3fd7a0..7bf0e84 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -99,12 +99,10 @@ public class CarbondataRecordSet implements RecordSet {
       RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns,
split);
       return rc;
     } catch (QueryExecutionException e) {
-      //throw new InterruptedException(e.getMessage());
-      System.out.println(e.getMessage());
-    } catch (Exception ex) {
-      System.out.println(ex.toString());
+       throw new RuntimeException(e.getMessage(), e);
+   } catch (Exception ex) {
+      throw new RuntimeException(ex.getMessage(), ex);
     }
-    return null;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ddf06f15/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index f0958c7..a9652cc 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -129,7 +129,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider
{
       CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
       Type type = cdch.getColumnType();
 
-      DataType coltype = Spi2CarbondataTypeMapper(type);
+      DataType coltype = Spi2CarbondataTypeMapper(cdch);
       Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
 
       domain = originalConstraint.getDomains().get().get(c);
@@ -200,6 +200,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider
{
         if (coltype.equals(DataType.STRING)) {
           ex = new EqualToExpression(colExpression,
               new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+        } else if (coltype.equals(DataType.TIMESTAMP) || coltype.equals(DataType.DATE)) {
+          Long value = (Long) singleValues.get(0) * 1000;
+          ex = new EqualToExpression(colExpression,
+              new LiteralExpression(value , coltype));
         } else ex = new EqualToExpression(colExpression,
             new LiteralExpression(singleValues.get(0), coltype));
         filters.add(ex);
@@ -241,16 +245,18 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider
{
         CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
   }
 
-  public static DataType Spi2CarbondataTypeMapper(Type colType) {
+  public static DataType Spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle)
{
+    Type colType = carbondataColumnHandle.getColumnType();
     if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
     else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
     else if (colType == IntegerType.INTEGER) return DataType.INT;
     else if (colType == BigintType.BIGINT) return DataType.LONG;
     else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
-    else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
     else if (colType == VarcharType.VARCHAR) return DataType.STRING;
     else if (colType == DateType.DATE) return DataType.DATE;
     else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+    else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
+        carbondataColumnHandle.getScale())) return DataType.DECIMAL;
     else return DataType.STRING;
   }
 


Mime
View raw message