carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [2/3] incubator-carbondata git commit: add presto integration 0.0.1
Date Thu, 23 Mar 2017 05:13:14 GMT
add presto integration 0.0.1

update pom & annotation


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/56720871
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/56720871
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/56720871

Branch: refs/heads/master
Commit: 56720871a70d0bd3947e54cd5f9353b510e9fb82
Parents: e441ab0
Author: ffpeng90 <ffp900510@126.com>
Authored: Sun Mar 12 20:27:32 2017 +0800
Committer: chenliang613 <chenliang613@huawei.com>
Committed: Thu Mar 23 10:41:51 2017 +0530

----------------------------------------------------------------------
 integration/presto/pom.xml                      | 167 +++++
 integration/presto/src/checkstyle/checks.xml    |   7 +
 .../presto/src/license/LICENSE-HEADER.txt       |  11 +
 .../carbondata/CarbondataColumnConstraint.java  |  99 +++
 .../carbondata/CarbondataColumnHandle.java      | 159 ++++
 .../presto/carbondata/CarbondataConnector.java  |  84 +++
 .../carbondata/CarbondataConnectorFactory.java  |  95 +++
 .../carbondata/CarbondataConnectorId.java       |  57 ++
 .../carbondata/CarbondataHandleResolver.java    |  45 ++
 .../presto/carbondata/CarbondataMetadata.java   | 305 ++++++++
 .../presto/carbondata/CarbondataModule.java     |  79 ++
 .../presto/carbondata/CarbondataPlugin.java     |  32 +
 .../carbondata/CarbondataRecordCursor.java      | 152 ++++
 .../presto/carbondata/CarbondataRecordSet.java  | 111 +++
 .../carbondata/CarbondataRecordSetProvider.java | 292 ++++++++
 .../presto/carbondata/CarbondataSplit.java      |  92 +++
 .../carbondata/CarbondataSplitManager.java      | 300 ++++++++
 .../carbondata/CarbondataTableHandle.java       |  80 ++
 .../carbondata/CarbondataTableLayoutHandle.java |  83 +++
 .../carbondata/CarbondataTransactionHandle.java |  22 +
 .../com/facebook/presto/carbondata/Types.java   |  34 +
 .../carbondata/impl/CarbonLocalInputSplit.java  |  89 +++
 .../carbondata/impl/CarbonTableCacheModel.java  |  40 +
 .../carbondata/impl/CarbonTableConfig.java      |  58 ++
 .../carbondata/impl/CarbonTableReader.java      | 736 +++++++++++++++++++
 25 files changed, 3229 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
new file mode 100644
index 0000000..b9a7e34
--- /dev/null
+++ b/integration/presto/pom.xml
@@ -0,0 +1,167 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>com.facebook.presto</groupId>
+        <artifactId>presto-root</artifactId>
+        <version>0.153</version>
+    </parent>
+
+    <groupId>com.facebook.presto</groupId>
+    <artifactId>presto-carbondata</artifactId>
+    <packaging>presto-plugin</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.facebook.presto</groupId>
+            <artifactId>presto-spi</artifactId>
+            <version>0.153</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>0.9.3</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.carbondata</groupId>
+            <artifactId>carbondata-core</artifactId>
+            <version>1.0.0-incubating</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.carbondata</groupId>
+            <artifactId>carbondata-common</artifactId>
+            <version>1.0.0-incubating</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.carbondata</groupId>
+            <artifactId>carbondata-hadoop</artifactId>
+            <version>1.0.0-incubating</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>bootstrap</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>json</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>log</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.inject</groupId>
+            <artifactId>guice</artifactId>
+        </dependency>
+
+        <!--presto intergated-->
+
+        <dependency>
+            <groupId>com.facebook.presto</groupId>
+            <artifactId>presto-spi</artifactId>
+            <version>0.153</version>
+            <!--<scope>provided</scope>-->
+        </dependency>
+
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>slice</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>units</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.facebook.presto.hadoop</groupId>
+            <artifactId>hadoop-apache2</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>1.4.1</version>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>com.ning.maven.plugins</groupId>
+                <artifactId>maven-dependency-versions-check-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                    <failBuildInCaseOfConflict>false</failBuildInCaseOfConflict>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <skip>false</skip>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>com.ning.maven.plugins</groupId>
+                <artifactId>maven-duplicate-finder-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>pl.project13.maven</groupId>
+                <artifactId>git-commit-id-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/checkstyle/checks.xml
----------------------------------------------------------------------
diff --git a/integration/presto/src/checkstyle/checks.xml b/integration/presto/src/checkstyle/checks.xml
new file mode 100755
index 0000000..fb6f41d
--- /dev/null
+++ b/integration/presto/src/checkstyle/checks.xml
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE module PUBLIC
+        "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+        "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<module name="Checker">
+
+</module>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/license/LICENSE-HEADER.txt
----------------------------------------------------------------------
diff --git a/integration/presto/src/license/LICENSE-HEADER.txt b/integration/presto/src/license/LICENSE-HEADER.txt
new file mode 100755
index 0000000..d955a86
--- /dev/null
+++ b/integration/presto/src/license/LICENSE-HEADER.txt
@@ -0,0 +1,11 @@
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnConstraint.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnConstraint.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnConstraint.java
new file mode 100755
index 0000000..29d2076
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnConstraint.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.predicate.Domain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSetter;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataColumnConstraint {
+    private final String name;
+    private final boolean invertedindexed;
+    private Optional<Domain> domain;
+
+    @JsonCreator
+    public CarbondataColumnConstraint(
+            @JsonProperty("name") String name,
+            @JsonProperty("domain") Optional<Domain> domain,
+            @JsonProperty("invertedindexed") boolean invertedindexed)
+    {
+        this.name = requireNonNull(name, "name is null");
+        this.invertedindexed = requireNonNull(invertedindexed, "invertedIndexed is null");
+        this.domain = requireNonNull(domain, "domain is null");
+    }
+
+    @JsonProperty
+    public boolean isInvertedindexed()
+    {
+        return invertedindexed;
+    }
+
+    @JsonProperty
+    public String getName()
+    {
+        return name;
+    }
+
+    @JsonProperty
+    public Optional<Domain> getDomain()
+    {
+        return domain;
+    }
+
+    @JsonSetter
+    public void setDomain(Optional<Domain> domain)
+    {
+        this.domain = domain;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(name, domain, invertedindexed);
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (this == obj) {
+            return true;
+        }
+
+        if ((obj == null) || (getClass() != obj.getClass())) {
+            return false;
+        }
+
+        CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
+        return Objects.equals(this.name, other.name)
+                && Objects.equals(this.domain, other.domain)
+                && Objects.equals(this.invertedindexed, other.invertedindexed);
+    }
+
+    @Override
+    public String toString()
+    {
+        return toStringHelper(this)
+                .add("name", this.name)
+                .add("invertedindexed", this.invertedindexed)
+                .add("domain", this.domain)
+                .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnHandle.java
new file mode 100755
index 0000000..49d140a
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnHandle.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.Type;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataColumnHandle
+    implements ColumnHandle
+{
+        private final String connectorId;
+        private final String columnName;
+
+    public boolean isInvertedIndex() {
+        return isInvertedIndex;
+    }
+
+    private final Type columnType;
+    private final int ordinalPosition;
+    private final int keyOrdinal;
+    private final int columnGroupOrdinal;
+
+    private final int columnGroupId;
+    private final String columnUniqueId;
+    private final boolean isInvertedIndex;
+
+    public boolean isMeasure() {
+        return isMeasure;
+    }
+
+    private final boolean isMeasure;
+
+    public int getKeyOrdinal() {
+        return keyOrdinal;
+    }
+
+    public int getColumnGroupOrdinal() {
+        return columnGroupOrdinal;
+    }
+
+    public int getColumnGroupId() {
+        return columnGroupId;
+    }
+
+    public String getColumnUniqueId() {
+        return columnUniqueId;
+    }
+    /* ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table
+        IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1  from tablename)
+        The columnhandle of clm3 : has ordinalposition = 3
+     */
+
+    @JsonCreator
+    public CarbondataColumnHandle(
+            @JsonProperty("connectorId") String connectorId,
+            @JsonProperty("columnName") String columnName,
+            @JsonProperty("columnType") Type columnType,
+            @JsonProperty("ordinalPosition") int ordinalPosition,
+            @JsonProperty("keyOrdinal") int keyOrdinal,
+            @JsonProperty("columnGroupOrdinal") int columnGroupOrdinal,
+            @JsonProperty("isMeasure") boolean isMeasure,
+            @JsonProperty("columnGroupId") int columnGroupId,
+            @JsonProperty("columnUniqueId") String columnUniqueId,
+            @JsonProperty("isInvertedIndex") boolean isInvertedIndex)
+    {
+        this.connectorId = requireNonNull(connectorId, "connectorId is null");
+        this.columnName = requireNonNull(columnName, "columnName is null");
+        this.columnType = requireNonNull(columnType, "columnType is null");
+
+        this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
+        this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
+        this.columnGroupOrdinal = requireNonNull(columnGroupOrdinal, "columnGroupOrdinal is null");
+
+        this.isMeasure = isMeasure;
+        this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
+        this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
+        this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+    }
+
+    @JsonProperty
+    public String getConnectorId()
+    {
+        return connectorId;
+    }
+
+    @JsonProperty
+    public String getColumnName()
+    {
+        return columnName;
+    }
+
+    @JsonProperty
+    public Type getColumnType()
+    {
+        return columnType;
+    }
+
+    @JsonProperty
+    public int getOrdinalPosition()
+    {
+        return ordinalPosition;
+    }
+
+    public ColumnMetadata getColumnMetadata()
+    {
+        return new ColumnMetadata(columnName, columnType, null, false);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(connectorId, columnName);
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (this == obj) {
+            return true;
+        }
+        if ((obj == null) || (getClass() != obj.getClass())) {
+            return false;
+        }
+
+        CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
+        return Objects.equals(this.connectorId, other.connectorId) &&
+                Objects.equals(this.columnName, other.columnName);
+    }
+
+    @Override
+    public String toString()
+    {
+        return toStringHelper(this)
+                .add("connectorId", connectorId)
+                .add("columnName", columnName)
+                .add("columnType", columnType)
+                .add("ordinalPosition", ordinalPosition)
+                .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnector.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnector.java
new file mode 100755
index 0000000..7d8fdee
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnector.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnector
+        implements Connector
+{
+
+    private static final Logger log = Logger.get(CarbondataConnector.class);
+
+    private final LifeCycleManager lifeCycleManager;
+    private final CarbondataMetadata metadata;
+    private final ConnectorSplitManager splitManager;
+    private final ConnectorRecordSetProvider recordSetProvider;
+    private final ClassLoader classLoader;
+
+
+    public CarbondataConnector(LifeCycleManager lifeCycleManager,
+                               CarbondataMetadata metadata,
+                               ConnectorSplitManager splitManager,
+                               ConnectorRecordSetProvider recordSetProvider,
+                               ClassLoader classLoader)
+    {
+        this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+        this.metadata = requireNonNull(metadata, "metadata is null");
+        this.splitManager = requireNonNull(splitManager, "splitManager is null");
+        this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+        this.classLoader = requireNonNull(classLoader, "classLoader is null");
+    }
+
+    @Override
+    public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) {
+        checkConnectorSupports(READ_COMMITTED, isolationLevel);
+        return CarbondataTransactionHandle.INSTANCE;
+    }
+
+    @Override
+    public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
+        metadata.putClassLoader(classLoader);
+        return metadata;
+    }
+
+    @Override
+    public ConnectorSplitManager getSplitManager() {
+        return splitManager;
+    }
+
+    @Override
+    public ConnectorRecordSetProvider getRecordSetProvider()
+    {
+        return recordSetProvider;
+    }
+
+    @Override
+    public final void shutdown()
+    {
+        try {
+            lifeCycleManager.stop();
+        }
+        catch (Exception e) {
+            log.error(e, "Error shutting down connector");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorFactory.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorFactory.java
new file mode 100755
index 0000000..09a3717
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
+import com.google.common.base.Throwables;
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.json.JsonModule;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnectorFactory
+        implements ConnectorFactory {
+
+    private final String name;
+    private final ClassLoader classLoader;
+
+    public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader){
+        this.name = connectorName;
+        this.classLoader = requireNonNull(classLoader, "classLoader is null");
+    }
+
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public ConnectorHandleResolver getHandleResolver() {
+        return new CarbondataHandleResolver();
+    }
+
+    @Override
+    public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) {
+        requireNonNull(config, "config is null");
+
+        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+            Bootstrap app = new Bootstrap(new JsonModule(), new CarbondataModule(connectorId, context.getTypeManager()));
+
+            Injector injector = app
+                    .strictConfig()
+                    .doNotInitializeLogging()
+                    .setRequiredConfigurationProperties(config)
+                    .initialize();
+
+            LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
+            CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
+            //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
+            ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
+            ConnectorRecordSetProvider connectorRecordSet = injector.getInstance(ConnectorRecordSetProvider.class);
+            //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
+
+
+            //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
+            //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
+            //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
+            //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
+            //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
+
+
+            return new CarbondataConnector(
+                    lifeCycleManager,
+                    metadata,
+                    new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
+                    connectorRecordSet,//new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
+                    classLoader
+                    //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+                    //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
+                    //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
+            );
+        }
+        catch (Exception e) {
+            throw Throwables.propagate(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorId.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorId.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorId.java
new file mode 100755
index 0000000..09289e3
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorId.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.google.inject.Inject;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnectorId
+{
+    private final String id;
+
+    @Inject
+    public CarbondataConnectorId(String id)
+    {
+        this.id = requireNonNull(id, "id is null");
+    }
+
+    @Override
+    public String toString()
+    {
+        return id;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(id);
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (this == obj) {
+            return true;
+        }
+
+        if ((obj == null) || (getClass() != obj.getClass())) {
+            return false;
+        }
+
+        return Objects.equals(this.id, ((CarbondataConnectorId) obj).id);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataHandleResolver.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataHandleResolver.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataHandleResolver.java
new file mode 100755
index 0000000..729d9e5
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataHandleResolver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+public class CarbondataHandleResolver implements ConnectorHandleResolver {
+    @Override
+    public Class<? extends ConnectorTableHandle> getTableHandleClass() {
+        return CarbondataTableHandle.class;
+    }
+
+    @Override
+    public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() {
+        return CarbondataTableLayoutHandle.class;
+    }
+
+    @Override
+    public Class<? extends ColumnHandle> getColumnHandleClass() {
+        return CarbondataColumnHandle.class;
+    }
+
+    @Override
+    public Class<? extends ConnectorSplit> getSplitClass() {
+        return CarbondataSplit.class;
+    }
+
+    @Override
+    public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
+    {
+        return CarbondataTransactionHandle.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataMetadata.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataMetadata.java
new file mode 100755
index 0000000..2866149
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataMetadata.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.carbondata.impl.CarbonTableReader;
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+import javax.inject.Inject;
+import java.util.*;
+
+import static com.facebook.presto.carbondata.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataMetadata
+        implements ConnectorMetadata
+{
+    private final String connectorId;
+    private CarbonTableReader carbonTableReader;
+    private ClassLoader classLoader;
+
+    private Map<String, ColumnHandle> columnHandleMap;
+
+    @Inject
+    public CarbondataMetadata(CarbondataConnectorId connectorId, CarbonTableReader reader)
+    {
+        this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+        this.carbonTableReader = requireNonNull(reader, "client is null");
+    }
+
+
+    public void putClassLoader(ClassLoader classLoader)
+    {
+        this.classLoader = classLoader;
+    }
+
+
+    @Override
+    public List<String> listSchemaNames(ConnectorSession session) {
+        return listSchemaNamesInternal();
+    }
+
+
+    public List<String> listSchemaNamesInternal()
+    {
+        List<String> ret;
+        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+            ret = carbonTableReader.getSchemaNames();
+        }
+        return ret;
+    }
+
+    @Override
+    public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
+
+        /*List<SchemaTableName> all = carbonTableReader.getTableList();
+        if(schemaNameOrNull != null)
+        {
+            return all.stream().filter(a -> schemaNameOrNull.equals(a.getSchemaName())).collect(Collectors.toList());
+        }
+        return all;*/
+
+        List<String> schemaNames;
+        if (schemaNameOrNull != null) {
+            schemaNames = ImmutableList.of(schemaNameOrNull);
+        }
+        else {
+            schemaNames = carbonTableReader.getSchemaNames();
+        }
+
+        ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
+        for (String schemaName : schemaNames) {
+            for (String tableName : carbonTableReader.getTableNames(schemaName)) {
+                builder.add(new SchemaTableName(schemaName, tableName));
+            }
+        }
+        return builder.build();
+    }
+
+    @Override
+    public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) {
+        requireNonNull(prefix, "SchemaTablePrefix is null");
+
+        ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
+        for (SchemaTableName tableName : listTables(session, prefix)) {
+            ConnectorTableMetadata tableMetadata = getTableMetadata(tableName);
+            if (tableMetadata != null) {
+                columns.put(tableName, tableMetadata.getColumns());
+            }
+        }
+        return columns.build();
+    }
+
+    //if prefix is null. return all tables
+    //if prefix is not null, just return this table
+    private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix)
+    {
+        if (prefix.getSchemaName() == null) {
+            return listTables(session, prefix.getSchemaName());
+        }
+        return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
+    }
+
+    private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)
+    {
+        if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
+            return null;
+        }
+
+        CarbonTable cb = carbonTableReader.getTable(tableName);
+        if (cb == null) {
+            return null;
+        }
+
+        List<ColumnMetadata> spiCols = new LinkedList<>();
+        List<CarbonDimension> cols = cb.getDimensionByTableName(tableName.getTableName());
+        for(CarbonDimension col : cols)
+        {
+            //show columns command will return these data
+            Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
+            ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
+            spiCols.add(spiCol);
+        }
+
+        List<CarbonMeasure> mcols = cb.getMeasureByTableName(tableName.getTableName());
+        for(CarbonMeasure mcol : mcols)
+        {
+            Type spiType = CarbondataType2SpiMapper(mcol.getColumnSchema().getDataType());
+            ColumnMetadata spiCol = new ColumnMetadata(mcol.getColumnSchema().getColumnName(), spiType);
+            spiCols.add(spiCol);
+        }
+
+        //封装carbonTable
+        return new ConnectorTableMetadata(tableName, spiCols);
+    }
+
+    @Override
+    public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) {
+
+        CarbondataTableHandle handle = checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+        checkArgument(handle.getConnectorId().equals(connectorId), "tableHandle is not for this connector");
+
+        String schemaName = handle.getSchemaTableName().getSchemaName();
+        if (!listSchemaNamesInternal().contains(schemaName)) {
+            throw new SchemaNotFoundException(schemaName);
+        }
+
+        //CarbonTable(official struct) is stored in CarbonMetadata(official struct)
+        CarbonTable cb = carbonTableReader.getTable(handle.getSchemaTableName());
+        if (cb == null) {
+            throw new TableNotFoundException(handle.getSchemaTableName());
+        }
+
+        ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
+        int index = 0;
+        String tableName = handle.getSchemaTableName().getTableName();
+        for (CarbonDimension column : cb.getDimensionByTableName(tableName)) {
+            ColumnSchema cs = column.getColumnSchema();
+
+            int complex = column.getComplexTypeOrdinal();
+            column.getNumberOfChild();
+            column.getListOfChildDimensions();
+
+            Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+            columnHandles.put(
+                    cs.getColumnName(),
+                    new CarbondataColumnHandle(
+                            connectorId,
+                            cs.getColumnName(),
+                            spiType,
+                            index,
+                            column.getKeyOrdinal(),
+                            column.getColumnGroupOrdinal(),
+                            false,
+                            cs.getColumnGroupId(),
+                            cs.getColumnUniqueId(),
+                            cs.isUseInvertedIndex()));
+            index++;
+        }
+
+        for(CarbonMeasure measure : cb.getMeasureByTableName(tableName)){
+            ColumnSchema cs = measure.getColumnSchema();
+
+            Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+            columnHandles.put(
+                    cs.getColumnName(),
+                    new CarbondataColumnHandle(
+                            connectorId,
+                            cs.getColumnName(),
+                            spiType,
+                            index,
+                            measure.getOrdinal(),
+                            cs.getColumnGroupId(),
+                            true,
+                            cs.getColumnGroupId(),
+                            cs.getColumnUniqueId(),
+                            cs.isUseInvertedIndex()));
+            index++;
+        }
+
+        //should i cache it?
+        columnHandleMap = columnHandles.build();
+
+        return columnHandleMap;
+    }
+
+    @Override
+    public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
+
+        checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+        return checkType(columnHandle, CarbondataColumnHandle.class, "columnHandle").getColumnMetadata();
+    }
+
+    @Override
+    public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
+        //check tablename is valid
+        //schema is exist
+        //tables is exist
+
+        //CarbondataTable  get from jar
+        return new CarbondataTableHandle(connectorId, tableName);
+    }
+
+    @Override
+    public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) {
+        CarbondataTableHandle handle = checkType(table, CarbondataTableHandle.class, "table");
+        ConnectorTableLayout layout = new ConnectorTableLayout(new CarbondataTableLayoutHandle(handle,constraint.getSummary()/*, constraint.getPredicateMap(),constraint.getFilterTuples()*/));
+        return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+    }
+
+    @Override
+    public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) {
+        return new ConnectorTableLayout(handle);
+    }
+
+    @Override
+    public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) {
+        return getTableMetadataInternal(table);
+    }
+
+    public ConnectorTableMetadata getTableMetadataInternal(ConnectorTableHandle table)
+    {
+        CarbondataTableHandle carbondataTableHandle = checkType(table, CarbondataTableHandle.class, "table");
+        checkArgument(carbondataTableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector");
+        return getTableMetadata(carbondataTableHandle.getSchemaTableName());
+    }
+
+
+    public static Type CarbondataType2SpiMapper(DataType colType)
+    {
+        switch (colType)
+        {
+            case BOOLEAN:
+                return BooleanType.BOOLEAN;
+            case SHORT:
+                return SmallintType.SMALLINT;
+            case INT:
+                return IntegerType.INTEGER;
+            case LONG:
+                return BigintType.BIGINT;
+            case FLOAT:
+            case DOUBLE:
+                return DoubleType.DOUBLE;
+
+            case DECIMAL:
+                return DecimalType.createDecimalType();
+            case STRING:
+                return VarcharType.VARCHAR;
+            case DATE:
+                return DateType.DATE;
+            case TIMESTAMP:
+                return TimestampType.TIMESTAMP;
+
+            /*case DataType.MAP:
+            case DataType.ARRAY:
+            case DataType.STRUCT:
+            case DataType.NULL:*/
+
+            default:
+                return VarcharType.VARCHAR;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataModule.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataModule.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataModule.java
new file mode 100755
index 0000000..af62a75
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataModule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.carbondata.impl.CarbonTableConfig;
+import com.facebook.presto.carbondata.impl.CarbonTableReader;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.TypeManager;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import javax.inject.Inject;
+
+import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataModule implements Module {
+
+    private final String connectorId;
+    private final TypeManager typeManager;
+
+    public CarbondataModule(String connectorId, TypeManager typeManager)
+    {
+        this.connectorId = requireNonNull(connectorId, "connector id is null");
+        this.typeManager = requireNonNull(typeManager, "typeManager is null");
+    }
+
+    @Override
+    public void configure(Binder binder) {
+        binder.bind(TypeManager.class).toInstance(typeManager);
+
+        binder.bind(CarbondataConnectorId.class).toInstance(new CarbondataConnectorId(connectorId));
+        binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
+        binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
+        binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
+        binder.bind(ConnectorRecordSetProvider.class).to(CarbondataRecordSetProvider.class).in(Scopes.SINGLETON);
+        binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
+        configBinder(binder).bindConfig(CarbonTableConfig.class);
+    }
+
+    public static final class TypeDeserializer
+            extends FromStringDeserializer<Type>
+    {
+        private final TypeManager typeManager;
+
+        @Inject
+        public TypeDeserializer(TypeManager typeManager)
+        {
+            super(Type.class);
+            this.typeManager = requireNonNull(typeManager, "typeManager is null");
+        }
+
+        @Override
+        protected Type _deserialize(String value, DeserializationContext context)
+        {
+            Type type = typeManager.getType(parseTypeSignature(value));
+            checkArgument(type != null, "Unknown type %s", value);
+            return type;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataPlugin.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataPlugin.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataPlugin.java
new file mode 100755
index 0000000..bd6a156
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataPlugin.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.Plugin;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.google.common.collect.ImmutableList;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+public class CarbondataPlugin implements Plugin {
+
+    @Override
+    public Iterable<ConnectorFactory> getConnectorFactories()
+    {
+        return ImmutableList.of(new CarbondataConnectorFactory("carbondata", getClassLoader()));
+    }
+
+    private static ClassLoader getClassLoader() {
+        return FileFactory.class.getClassLoader();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordCursor.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordCursor.java
new file mode 100755
index 0000000..43ca876
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordCursor.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.Type;
+import com.google.common.base.Strings;
+import io.airlift.log.Logger;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
+import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+public class CarbondataRecordCursor implements RecordCursor {
+
+    private static final Logger log = Logger.get(CarbondataRecordCursor.class);
+    private final List<CarbondataColumnHandle> columnHandles;
+
+    private List<String> fields;
+    private CarbondataSplit split;
+    private CarbonIterator<Object[]> rowCursor;
+    private CarbonReadSupport<Object[]> readSupport;
+
+    private long totalBytes;
+    private long nanoStart;
+    private long nanoEnd;
+
+    public CarbondataRecordCursor(CarbonReadSupport<Object[]> readSupport, CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles, CarbondataSplit split) {
+        this.rowCursor = carbonIterator;
+        this.columnHandles = columnHandles;
+        this.readSupport = readSupport;
+        this.totalBytes = 0;
+    }
+
+
+    @Override
+    public long getTotalBytes() {
+        return totalBytes;
+    }
+
+    @Override
+    public long getCompletedBytes() {
+        return totalBytes;
+    }
+
+    @Override
+    public long getReadTimeNanos() {
+        return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
+    }
+
+    @Override
+    public Type getType(int field) {
+
+        checkArgument(field < columnHandles.size(), "Invalid field index");
+        return columnHandles.get(field).getColumnType();
+    }
+
+    @Override
+    public boolean advanceNextPosition() {
+
+        if (nanoStart == 0) {
+            nanoStart = System.nanoTime();
+        }
+
+        if(rowCursor.hasNext())
+        {
+            fields = Stream.of(readSupport.readRow(rowCursor.next())).map(a -> a.toString()).collect(Collectors.toList());
+
+            totalBytes += fields.size();
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean getBoolean(int field) {
+        checkFieldType(field, BOOLEAN);
+        return Boolean.parseBoolean(getFieldValue(field));
+    }
+
+    @Override
+    public long getLong(int field) {
+        String timeStr = getFieldValue(field);
+        Long milliSec = 0L;
+
+        //suppose the
+        return Math.round(Double.parseDouble(getFieldValue(field)));
+    }
+
+    @Override
+    public double getDouble(int field) {
+        checkFieldType(field, DOUBLE);
+        return Double.parseDouble(getFieldValue(field));
+    }
+
+    @Override
+    public Slice getSlice(int field) {
+        checkFieldType(field, VARCHAR);
+        return Slices.utf8Slice(getFieldValue(field));
+    }
+
+    @Override
+    public Object getObject(int field) {
+        return null;
+    }
+
+    @Override
+    public boolean isNull(int field) {
+        checkArgument(field < columnHandles.size(), "Invalid field index");
+        return Strings.isNullOrEmpty(getFieldValue(field));
+    }
+
+    String getFieldValue(int field)
+    {
+        checkState(fields != null, "Cursor has not been advanced yet");
+        return fields.get(field);
+    }
+
+    private void checkFieldType(int field, Type expected)
+    {
+        Type actual = getType(field);
+        checkArgument(actual.equals(expected), "Expected field %s to be type %s but is %s", field, expected, actual);
+    }
+
+    @Override
+    public void close() {
+        nanoEnd = System.nanoTime();
+
+        //todo  delete cache from readSupport
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSet.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSet.java
new file mode 100755
index 0000000..622ef8a
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSet.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.Type;
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.facebook.presto.carbondata.Types.checkType;
+
+public class CarbondataRecordSet implements RecordSet {
+
+    private CarbonTable carbonTable;
+    private TupleDomain<ColumnHandle> originalConstraint;
+    private Expression carbonConstraint;
+    private List<CarbondataColumnConstraint> rebuildConstraints;
+    private QueryModel queryModel;
+    private CarbondataSplit split;
+    private List<CarbondataColumnHandle> columns;
+    private QueryExecutor queryExecutor;
+
+    private CarbonReadSupport<Object[]> readSupport;
+
+    public CarbondataRecordSet(
+            CarbonTable carbonTable,
+            ConnectorSession session,
+            ConnectorSplit split,
+            List<CarbondataColumnHandle> columns,
+            QueryModel queryModel){
+        this.carbonTable = carbonTable;
+        this.split = checkType(split, CarbondataSplit.class, "connectorSplit");
+        this.originalConstraint = this.split.getConstraints();
+        this.rebuildConstraints = this.split.getRebuildConstraints();
+        this.queryModel = queryModel;
+        this.columns = columns;
+        this.readSupport = new DictionaryDecodedReadSupportImpl();
+    }
+
+    //todo support later
+    private Expression parseConstraint2Expression(TupleDomain<ColumnHandle> constraints) {
+        return null;
+    }
+
+    @Override
+    public List<Type> getColumnTypes() {
+        return columns.stream().map(a -> a.getColumnType()).collect(Collectors.toList());
+    }
+
+    @Override
+    public RecordCursor cursor() {
+        List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
+
+        //tableBlockInfoList.add(split.getLocalInputSplit().getTableBlockInfo());
+        /*BlockletInfos blockletInfos = new BlockletInfos(split.getLocalInputSplit().getNumberOfBlocklets(), 0,
+                split.getLocalInputSplit().getNumberOfBlocklets());*/
+        tableBlockInfoList.add(
+                new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
+                        split.getLocalInputSplit().getStart(),
+                        split.getLocalInputSplit().getSegmentId(),
+                        split.getLocalInputSplit().getLocations().toArray(new String[0]),
+                        split.getLocalInputSplit().getLength(),
+                        //blockletInfos,
+                        ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion())));
+        queryModel.setTableBlockInfos(tableBlockInfoList);
+
+        queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+
+        //queryModel.setQueryId(queryModel.getQueryId() + "_" + split.getLocalInputSplit().getSegmentId());
+        try {
+            readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
+            CarbonIterator<Object[]> carbonIterator = new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
+            RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
+            return rc;
+        } catch (QueryExecutionException e) {
+            //throw new InterruptedException(e.getMessage());
+            System.out.println(e.getMessage());
+        } catch(Exception ex) {
+            System.out.println(ex.toString());
+        }
+        return null;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSetProvider.java
new file mode 100755
index 0000000..9e82b93
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSetProvider.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.carbondata.impl.CarbonTableCacheModel;
+import com.facebook.presto.carbondata.impl.CarbonTableReader;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.facebook.presto.carbondata.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
+
+    private final String connectorId;
+    private final CarbonTableReader carbonTableReader;
+
+    @Inject
+    public CarbondataRecordSetProvider(
+            CarbondataConnectorId connectorId,
+            CarbonTableReader reader)
+    {
+        //this.config = requireNonNull(config, "config is null");
+        //this.connector = requireNonNull(connector, "connector is null");
+        this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+        this.carbonTableReader = reader;
+    }
+
+    @Override
+    public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
+        requireNonNull(split, "split is null");
+        requireNonNull(columns, "columns is null");
+
+        // Convert split
+        CarbondataSplit cdSplit = checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
+        checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+
+        // Convert all columns handles
+        ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
+        for (ColumnHandle handle : columns) {
+            handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
+        }
+
+        // Build column projection(check the column order)
+        String targetCols = "";
+        for(ColumnHandle col : columns){
+            targetCols += ((CarbondataColumnHandle)col).getColumnName() + ",";
+        }
+        targetCols = targetCols.substring(0, targetCols.length() -1 );
+        //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
+
+        CarbonTableCacheModel tableCacheModel = carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+        checkNotNull(tableCacheModel, "tableCacheModel should not be null");
+        checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
+        checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
+
+        // Build Query Model
+        CarbonTable targetTable = tableCacheModel.carbonTable;
+        CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
+        QueryModel queryModel = QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
+
+        // Push down filter
+        fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+
+        // Return new record set
+        return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit, handles.build(), queryModel);
+    }
+
+    // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
+    private void fillFilter2QueryModel(QueryModel queryModel, TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
+
+        //queryModel.setFilterExpressionResolverTree(new FilterResolverIntf());
+
+        //Build Predicate Expression
+        ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+        Domain domain = null;
+
+        for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+            // Build ColumnExpresstion for Expresstion(Carbondata)
+            CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+            Type type = cdch.getColumnType();
+
+            DataType coltype = Spi2CarbondataTypeMapper(type);
+            Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
+
+            domain = originalConstraint.getDomains().get().get(c);
+            checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+            if (domain.getValues().isNone()) {
+                //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
+                //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
+                //new Expression()
+            }
+
+            if (domain.getValues().isAll()) {
+                //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
+                //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
+            }
+
+            List<Object> singleValues = new ArrayList<>();
+            List<Expression> rangeFilter = new ArrayList<>();
+            for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+                checkState(!range.isAll()); // Already checked
+                if (range.isSingleValue()) {
+                    singleValues.add(range.getLow().getValue());
+                }
+                else
+                {
+                    List<String> rangeConjuncts = new ArrayList<>();
+                    if (!range.getLow().isLowerUnbounded()) {
+                        Object value = ConvertDataByType(range.getLow().getValue(), type);
+                        switch (range.getLow().getBound()) {
+                            case ABOVE:
+                                if (type == TimestampType.TIMESTAMP) {
+                                    //todo not now
+                                } else {
+                                    GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype));
+                                    //greater.setRangeExpression(true);
+                                    rangeFilter.add(greater);
+                                }
+                                break;
+                            case EXACTLY:
+                                GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
+                                //greater.setRangeExpression(true);
+                                rangeFilter.add(greater);
+                                break;
+                            case BELOW:
+                                throw new IllegalArgumentException("Low marker should never use BELOW bound");
+                            default:
+                                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
+                        }
+                    }
+                    if (!range.getHigh().isUpperUnbounded()) {
+                        Object value = ConvertDataByType(range.getHigh().getValue(), type);
+                        switch (range.getHigh().getBound()) {
+                            case ABOVE:
+                                throw new IllegalArgumentException("High marker should never use ABOVE bound");
+                            case EXACTLY:
+                                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
+                                //less.setRangeExpression(true);
+                                rangeFilter.add(less);
+                                break;
+                            case BELOW:
+                                LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+                                //less2.setRangeExpression(true);
+                                rangeFilter.add(less2);
+                                break;
+                            default:
+                                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
+                        }
+                    }
+                }
+            }
+
+            if (singleValues.size() == 1) {
+                Expression ex = null;
+                if (coltype.equals(DataType.STRING)) {
+                    ex = new EqualToExpression(colExpression, new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+                } else
+                    ex = new EqualToExpression(colExpression, new LiteralExpression(singleValues.get(0), coltype));
+                filters.add(ex);
+            }
+            else if(singleValues.size() > 1) {
+                ListExpression candidates = null;
+                List<Expression> exs = singleValues.stream().map((a) ->
+                {
+                    return new LiteralExpression(ConvertDataByType(a, type), coltype);
+                }).collect(Collectors.toList());
+                candidates = new ListExpression(exs);
+
+                if(candidates != null)
+                    filters.add(new InExpression(colExpression, candidates));
+            }
+            else if(rangeFilter.size() > 0){
+                if(rangeFilter.size() > 1) {
+                    Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+                    if(rangeFilter.size() > 2)
+                    {
+                        for(int i = 2; i< rangeFilter.size(); i++)
+                        {
+                            filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
+                        }
+                    }
+                }
+                else if(rangeFilter.size() == 1)
+                    filters.add(rangeFilter.get(0));
+            }
+        }
+
+        Expression finalFilters;
+        List<Expression> tmp = filters.build();
+        if(tmp.size() > 1) {
+            finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+            if(tmp.size() > 2)
+            {
+                for(int i = 2; i< tmp.size(); i++)
+                {
+                    finalFilters = new AndExpression(finalFilters, tmp.get(i));
+                }
+            }
+        }
+        else if(tmp.size() == 1)
+            finalFilters = tmp.get(0);
+        else
+            return;
+
+        // todo set into QueryModel
+        CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable);
+        queryModel.setFilterExpressionResolverTree(CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
+    }
+
+    public static DataType Spi2CarbondataTypeMapper(Type colType)
+    {
+        if(colType == BooleanType.BOOLEAN)
+            return DataType.BOOLEAN;
+        else if(colType == SmallintType.SMALLINT)
+            return DataType.SHORT;
+        else if(colType == IntegerType.INTEGER)
+            return DataType.INT;
+        else if(colType == BigintType.BIGINT)
+            return DataType.LONG;
+        else if(colType == DoubleType.DOUBLE)
+            return DataType.DOUBLE;
+        else if(colType == DecimalType.createDecimalType())
+            return DataType.DECIMAL;
+        else if(colType == VarcharType.VARCHAR)
+            return DataType.STRING;
+        else if(colType == DateType.DATE)
+            return DataType.DATE;
+        else if(colType == TimestampType.TIMESTAMP)
+            return DataType.TIMESTAMP;
+        else
+            return DataType.STRING;
+    }
+
+
+    public Object ConvertDataByType(Object rawdata, Type type)
+    {
+        if(type.equals(IntegerType.INTEGER))
+            return new Integer((rawdata.toString()));
+        else if(type.equals(BigintType.BIGINT))
+            return (Long)rawdata;
+        else if(type.equals(VarcharType.VARCHAR))
+            return ((Slice)rawdata).toStringUtf8();
+        else if(type.equals(BooleanType.BOOLEAN))
+            return (Boolean)(rawdata);
+
+        return rawdata;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplit.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplit.java
new file mode 100755
index 0000000..eb3db1e
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplit.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.carbondata.impl.CarbonLocalInputSplit;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.HostAddress;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataSplit implements ConnectorSplit {
+
+    private final String connectorId;
+    private final SchemaTableName schemaTableName;
+    private final TupleDomain<ColumnHandle> constraints;
+    private final CarbonLocalInputSplit localInputSplit;
+    private final List<CarbondataColumnConstraint> rebuildConstraints;
+    private final ImmutableList<HostAddress> addresses;
+
+    @JsonCreator
+    public CarbondataSplit( @JsonProperty("connectorId") String connectorId,
+                            @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
+                            @JsonProperty("constraints") TupleDomain<ColumnHandle> constraints,
+                            @JsonProperty("localInputSplit") CarbonLocalInputSplit localInputSplit,
+                            @JsonProperty("rebuildConstraints") List<CarbondataColumnConstraint> rebuildConstraints) {
+        this.connectorId = requireNonNull(connectorId, "connectorId is null");
+        this.schemaTableName = requireNonNull(schemaTableName, "schemaTable is null");
+        this.constraints = requireNonNull(constraints, "constraints is null");
+        this.localInputSplit = requireNonNull(localInputSplit, "localInputSplit is null");
+        this.rebuildConstraints = requireNonNull(rebuildConstraints, "rebuildConstraints is null");
+        this.addresses = ImmutableList.of();
+    }
+
+
+    @JsonProperty
+    public String getConnectorId() {
+        return connectorId;
+    }
+
+    @JsonProperty
+    public SchemaTableName getSchemaTableName(){
+        return  schemaTableName;
+    }
+
+    @JsonProperty
+    public TupleDomain<ColumnHandle> getConstraints() {
+        return constraints;
+    }
+
+    @JsonProperty
+    public CarbonLocalInputSplit getLocalInputSplit(){return localInputSplit;}
+
+    @JsonProperty
+    public List<CarbondataColumnConstraint> getRebuildConstraints() {
+        return rebuildConstraints;
+    }
+
+    @Override
+    public boolean isRemotelyAccessible() {
+        return true;
+    }
+
+    @Override
+    public List<HostAddress> getAddresses() {
+        return addresses;
+    }
+
+    @Override
+    public Object getInfo() {
+        return this;
+    }
+}
+


Mime
View raw message