carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [1/3] incubator-carbondata git commit: add presto integration 0.0.1
Date Thu, 23 Mar 2017 05:13:13 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master e441ab0d4 -> 9d7dbea38


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java
new file mode 100755
index 0000000..692d69e
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.carbondata.impl.CarbonLocalInputSplit;
+import com.facebook.presto.carbondata.impl.CarbonTableCacheModel;
+import com.facebook.presto.carbondata.impl.CarbonTableReader;
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.facebook.presto.carbondata.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataSplitManager
+        implements ConnectorSplitManager
+{
+
+    private final String connectorId;
+    private final CarbonTableReader carbonTableReader;
+
+    @Inject
+    public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader)
+    {
+        this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+        this.carbonTableReader = requireNonNull(reader, "client is null");
+    }
+
+    public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout)
+    {
+        CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle)layout;
+        CarbondataTableHandle tableHandle = layoutHandle.getTable();
+        SchemaTableName key = tableHandle.getSchemaTableName();
+
+        //get all filter domain
+        List<CarbondataColumnConstraint> rebuildConstraints = getColumnConstraints(layoutHandle.getConstraint());
+
+        CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
+        Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable);
+
+        if(cache != null) {
+            try {
+                List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters);
+
+                ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
+                for (CarbonLocalInputSplit split : splits) {
+                    cSplits.add(new CarbondataSplit(
+                            connectorId,
+                            tableHandle.getSchemaTableName(),
+                            layoutHandle.getConstraint(),
+                            split,
+                            rebuildConstraints
+                    ));
+                }
+                return new FixedSplitSource(cSplits.build());
+            } catch (Exception ex) {
+                System.out.println(ex.toString());
+            }
+        }
+        return null;
+    }
+
+
+    public List<CarbondataColumnConstraint> getColumnConstraints(TupleDomain<ColumnHandle> constraint)
+    {
+        ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
+        for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains().get()) {
+            CarbondataColumnHandle columnHandle = checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
+
+            constraintBuilder.add(new CarbondataColumnConstraint(
+                    columnHandle.getColumnName(),
+                    Optional.of(columnDomain.getDomain()),
+                    columnHandle.isInvertedIndex()));
+        }
+
+        return constraintBuilder.build();
+    }
+
+
+    public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable)
+    {
+        ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+        Domain domain = null;
+
+        for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+            CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+            Type type = cdch.getColumnType();
+
+            List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName());
+            Optional<CarbonColumn>  target = ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst();
+
+            if(target.get() == null)
+                return null;
+
+            DataType coltype = target.get().getDataType();
+            ColumnExpression colExpression = new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
+            //colExpression.setColIndex(cs.getSchemaOrdinal());
+            colExpression.setDimension(target.get().isDimesion());
+            colExpression.setDimension(carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
+            colExpression.setCarbonColumn(target.get());
+
+            domain = originalConstraint.getDomains().get().get(c);
+            checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+            if (domain.getValues().isNone()) {
+                //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
+                //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
+                //new Expression()
+            }
+
+            if (domain.getValues().isAll()) {
+                //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
+                //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
+            }
+
+            List<Object> singleValues = new ArrayList<>();
+            List<Expression> rangeFilter = new ArrayList<>();
+            for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+                checkState(!range.isAll()); // Already checked
+                if (range.isSingleValue()) {
+                    singleValues.add(range.getLow().getValue());
+                }
+                else
+                {
+                    List<String> rangeConjuncts = new ArrayList<>();
+                    if (!range.getLow().isLowerUnbounded()) {
+                        Object value = ConvertDataByType(range.getLow().getValue(), type);
+                        switch (range.getLow().getBound()) {
+                            case ABOVE:
+                                if (type == TimestampType.TIMESTAMP) {
+                                    //todo not now
+                                } else {
+                                    GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype));
+                                    //greater.setRangeExpression(true);
+                                    rangeFilter.add(greater);
+                                }
+                                break;
+                            case EXACTLY:
+                                GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
+                                //greater.setRangeExpression(true);
+                                rangeFilter.add(greater);
+                                break;
+                            case BELOW:
+                                throw new IllegalArgumentException("Low marker should never use BELOW bound");
+                            default:
+                                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
+                        }
+                    }
+                    if (!range.getHigh().isUpperUnbounded()) {
+                        Object value = ConvertDataByType(range.getHigh().getValue(), type);
+                        switch (range.getHigh().getBound()) {
+                            case ABOVE:
+                                throw new IllegalArgumentException("High marker should never use ABOVE bound");
+                            case EXACTLY:
+                                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
+                                //less.setRangeExpression(true);
+                                rangeFilter.add(less);
+                                break;
+                            case BELOW:
+                                LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+                                //less2.setRangeExpression(true);
+                                rangeFilter.add(less2);
+                                break;
+                            default:
+                                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
+                        }
+                    }
+                }
+            }
+
+            if (singleValues.size() == 1) {
+                Expression ex = null;
+                if (coltype.equals(DataType.STRING)) {
+                    ex = new EqualToExpression(colExpression, new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+                } else
+                    ex = new EqualToExpression(colExpression, new LiteralExpression(singleValues.get(0), coltype));
+                filters.add(ex);
+            }
+            else if(singleValues.size() > 1) {
+                ListExpression candidates = null;
+                List<Expression> exs = singleValues.stream().map((a) ->
+                {
+                    return new LiteralExpression(ConvertDataByType(a, type), coltype);
+                }).collect(Collectors.toList());
+                candidates = new ListExpression(exs);
+
+                if(candidates != null)
+                    filters.add(new InExpression(colExpression, candidates));
+            }
+            else if(rangeFilter.size() > 0){
+                if(rangeFilter.size() > 1) {
+                    Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+                    if(rangeFilter.size() > 2)
+                    {
+                        for(int i = 2; i< rangeFilter.size(); i++)
+                        {
+                            filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
+                        }
+                    }
+                }
+                else if(rangeFilter.size() == 1)//only have one value
+                    filters.add(rangeFilter.get(0));
+            }
+        }
+
+        Expression finalFilters;
+        List<Expression> tmp = filters.build();
+        if(tmp.size() > 1) {
+            finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+            if(tmp.size() > 2)
+            {
+                for(int i = 2; i< tmp.size(); i++)
+                {
+                    finalFilters = new AndExpression(finalFilters, tmp.get(i));
+                }
+            }
+        }
+        else if(tmp.size() == 1)
+            finalFilters = tmp.get(0);
+        else//no filter
+            return null;
+
+        return finalFilters;
+    }
+
+    public static DataType Spi2CarbondataTypeMapper(Type colType)
+    {
+        if(colType == BooleanType.BOOLEAN)
+            return DataType.BOOLEAN;
+        else if(colType == SmallintType.SMALLINT)
+            return DataType.SHORT;
+        else if(colType == IntegerType.INTEGER)
+            return DataType.INT;
+        else if(colType == BigintType.BIGINT)
+            return DataType.LONG;
+        else if(colType == DoubleType.DOUBLE)
+            return DataType.DOUBLE;
+        else if(colType == DecimalType.createDecimalType())
+            return DataType.DECIMAL;
+        else if(colType == VarcharType.VARCHAR)
+            return DataType.STRING;
+        else if(colType == DateType.DATE)
+            return DataType.DATE;
+        else if(colType == TimestampType.TIMESTAMP)
+            return DataType.TIMESTAMP;
+        else
+            return DataType.STRING;
+    }
+
+
+    public Object ConvertDataByType(Object rawdata, Type type)
+    {
+        if(type.equals(IntegerType.INTEGER))
+            return new Integer((rawdata.toString()));
+        else if(type.equals(BigintType.BIGINT))
+            return (Long)rawdata;
+        else if(type.equals(VarcharType.VARCHAR))
+            return ((Slice)rawdata).toStringUtf8();
+        else if(type.equals(BooleanType.BOOLEAN))
+            return (Boolean)(rawdata);
+
+        return rawdata;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java
new file mode 100755
index 0000000..6b263e0
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.SchemaTableName;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+
+import java.util.Objects;
+
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataTableHandle
+        implements ConnectorTableHandle {
+
+    private final String connectorId;
+    private final SchemaTableName schemaTableName;
+
+    @JsonCreator
+    public CarbondataTableHandle(
+            @JsonProperty("connectorId") String connectorId,
+            @JsonProperty("schemaTableName") SchemaTableName schemaTableName)
+    {
+        this.connectorId = requireNonNull(connectorId.toLowerCase(ENGLISH), "connectorId is null");
+        this.schemaTableName = schemaTableName;
+    }
+
+    @JsonProperty
+    public String getConnectorId()
+    {
+        return connectorId;
+    }
+
+    @JsonProperty
+    public SchemaTableName getSchemaTableName()
+    {
+        return schemaTableName;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(connectorId, schemaTableName);
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (this == obj) {
+            return true;
+        }
+        if ((obj == null) || (getClass() != obj.getClass())) {
+            return false;
+        }
+
+        CarbondataTableHandle other = (CarbondataTableHandle) obj;
+        return Objects.equals(this.connectorId, other.connectorId) && this.schemaTableName.equals(other.getSchemaTableName());
+    }
+
+    @Override
+    public String toString()
+    {
+        return Joiner.on(":").join(connectorId, schemaTableName.toString());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java
new file mode 100755
index 0000000..01434bd
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataTableLayoutHandle
+        implements ConnectorTableLayoutHandle
+{
+    private final CarbondataTableHandle table;
+    private final TupleDomain<ColumnHandle> constraint;
+
+    @JsonCreator
+    public CarbondataTableLayoutHandle(@JsonProperty("table") CarbondataTableHandle table,
+                                       @JsonProperty("constraint") TupleDomain<ColumnHandle> constraint)
+    {
+        this.table = requireNonNull(table, "table is null");
+        this.constraint = requireNonNull(constraint, "constraint is null");
+    }
+
+    @JsonProperty
+    public CarbondataTableHandle getTable()
+    {
+        return table;
+    }
+
+    @JsonProperty
+    public TupleDomain<ColumnHandle> getConstraint()
+    {
+        return constraint;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+
+        CarbondataTableLayoutHandle other = (CarbondataTableLayoutHandle) obj;
+        return Objects.equals(table, other.table)
+                && Objects.equals(constraint, other.constraint);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(table, constraint);
+    }
+
+    @Override
+    public String toString()
+    {
+        return toStringHelper(this)
+                .add("table", table)
+                .add("constraint", constraint)
+                .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java
new file mode 100755
index 0000000..a643a33
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+public enum CarbondataTransactionHandle
+        implements ConnectorTransactionHandle
+{
+    INSTANCE
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java
new file mode 100755
index 0000000..5212dad
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import java.util.Locale;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class Types {
+    private Types() {}
+
+    public static <A, B extends A> B checkType(A value, Class<B> target, String name)
+    {
+        requireNonNull(value, String.format(Locale.ENGLISH, "%s is null", name));
+        checkArgument(target.isInstance(value),
+                "%s must be of type %s, not %s",
+                name,
+                target.getName(),
+                value.getClass().getName());
+        return target.cast(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java
new file mode 100755
index 0000000..6084022
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public class CarbonLocalInputSplit {
+
+    private static final long serialVersionUID = 3520344046772190207L;
+    private String segmentId;
+    private String path;
+    private long start;
+    private long length;
+    private List<String> locations;
+    private short version;
+    /**
+     * Number of BlockLets in a block
+     */
+    private int numberOfBlocklets = 0;
+
+
+    @JsonProperty
+    public short getVersion(){
+        return version;
+    }
+
+    @JsonProperty
+    public List<String> getLocations() {
+        return locations;
+    }
+
+    @JsonProperty
+    public long getLength() {
+        return length;
+    }
+
+    @JsonProperty
+    public long getStart() {
+        return start;
+    }
+
+    @JsonProperty
+    public String getPath() {
+        return path;
+    }
+
+    @JsonProperty
+    public String getSegmentId() {
+        return segmentId;
+    }
+
+    @JsonProperty
+    public int getNumberOfBlocklets() {
+        return numberOfBlocklets;
+    }
+
+    @JsonCreator
+    public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId,
+                                 @JsonProperty("path") String path,
+                                 @JsonProperty("start") long start,
+                                 @JsonProperty("length") long length,
+                                 @JsonProperty("locations") List<String> locations,
+                                 @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*,
+                                 @JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/,
+                                 @JsonProperty("version") short version) {
+        this.path = path;
+        this.start = start;
+        this.length = length;
+        this.segmentId = segmentId;
+        this.locations = locations;
+        this.numberOfBlocklets = numberOfBlocklets;
+        //this.tableBlockInfo = tableBlockInfo;
+        this.version = version;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java
new file mode 100755
index 0000000..d47f2a5
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata.impl;
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+public class CarbonTableCacheModel {
+
+    public CarbonTableIdentifier carbonTableIdentifier;
+    public CarbonTablePath carbonTablePath;
+
+    public TableInfo tableInfo;
+    public CarbonTable carbonTable;
+    public String[] segments;
+
+    public boolean isValid()
+    {
+        if(carbonTable != null
+                && carbonTablePath != null
+                && carbonTableIdentifier != null)
+            return true;
+        else
+            return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java
new file mode 100755
index 0000000..cd52b85
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata.impl;
+
+import io.airlift.configuration.Config;
+
+import javax.validation.constraints.NotNull;
+
+public class CarbonTableConfig {
+    //read from config
+    private String dbPtah;
+    private String tablePath;
+    private String storePath;
+
+    @NotNull
+    public String getDbPtah() {
+        return dbPtah;
+    }
+
+    @Config("carbondata-store")
+    public CarbonTableConfig setDbPtah(String dbPtah) {
+        this.dbPtah = dbPtah;
+        return this;
+    }
+
+    @NotNull
+    public String getTablePath() {
+        return tablePath;
+    }
+
+    @Config("carbondata-store")
+    public CarbonTableConfig setTablePath(String tablePath) {
+        this.tablePath = tablePath;
+        return this;
+    }
+
+    @NotNull
+    public String getStorePath() {
+        return storePath;
+    }
+
+    @Config("carbondata-store")
+    public CarbonTableConfig setStorePath(String storePath) {
+        this.storePath = storePath;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java
new file mode 100755
index 0000000..40bb841
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java
@@ -0,0 +1,736 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.carbondata.impl;
+
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.*;
+import org.apache.carbondata.core.datastore.block.*;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.service.impl.PathFactory;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CacheClient;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.thrift.TBase;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static java.util.Objects.requireNonNull;
+
+public class CarbonTableReader {
+    //CarbonTableReader will be a facade of these utils
+    //[
+    // 1:CarbonMetadata,(logic table)
+    // 2:FileFactory, (physic table file)
+    // 3:CarbonCommonFactory, (offer some )
+    // 4:DictionaryFactory, (parse dictionary util)
+    //]
+
+    private CarbonTableConfig config;
+    private List<SchemaTableName> tableList;
+    private CarbonFile dbStore;
+    private FileFactory.FileType fileType;
+
+    private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;//as a cache for Carbon reader
+
+    @Inject
+    public CarbonTableReader(CarbonTableConfig config){
+        this.config = requireNonNull(config, "CarbonTableConfig is null");
+        this.cc = new ConcurrentHashMap<>();
+    }
+
+    public CarbonTableCacheModel getCarbonCache(SchemaTableName table){
+        if(!cc.containsKey(table))//for worker node to initalize carbon metastore
+        {
+            try(ThreadContextClassLoader ignored = new ThreadContextClassLoader(FileFactory.class.getClassLoader())) {
+                if(dbStore == null) {
+                    fileType = FileFactory.getFileType(config.getStorePath());
+                    try{
+                        dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+                    }catch (Exception ex){
+                        throw new RuntimeException(ex);
+                    }
+                }
+            }
+            updateSchemaTables();
+            parseCarbonMetadata(table);
+        }
+
+        if(cc.containsKey(table))
+            return cc.get(table);
+        else
+            return null;//need to reload?*/
+    }
+
+    public List<String> getSchemaNames() {
+        return updateSchemaList();
+    }
+
+    //default PathFilter
+    private static final PathFilter DefaultFilter = new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+            return CarbonTablePath.isCarbonDataFile(path.getName());
+        }
+    };
+
+    public boolean updateDbStore(){
+        if(dbStore == null) {
+            fileType = FileFactory.getFileType(config.getStorePath());
+            try{
+                dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+            }catch (Exception ex){
+                throw new RuntimeException(ex);
+            }
+        }
+        return true;
+    }
+
+    public List<String> updateSchemaList() {
+        updateDbStore();
+
+        if(dbStore != null){
+            List<String> scs = Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
+            return scs;
+        }
+        else
+            return ImmutableList.of();
+    }
+
+
+    public Set<String> getTableNames(String schema) {
+        requireNonNull(schema, "schema is null");
+        return updateTableList(schema);
+    }
+
+    public Set<String> updateTableList(String dbName){
+        List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName())).collect(Collectors.toList());
+        if(schema.size() > 0)
+        {
+            return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName()).collect(Collectors.toSet());
+        }
+        else
+            return ImmutableSet.of();
+    }
+
+    public CarbonTable getTable(SchemaTableName schemaTableName) {
+        try {
+            updateSchemaTables();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        requireNonNull(schemaTableName, "schemaTableName is null");
+        CarbonTable table = loadTableMetadata(schemaTableName);
+
+        return table;
+    }
+
+
+    public void updateSchemaTables()
+    {
+        //update logic determine later
+        if(dbStore == null)
+        {
+            updateSchemaList();
+        }
+
+        tableList = new LinkedList<>();
+        for(CarbonFile db: dbStore.listFiles())
+        {
+            if(!db.getName().endsWith(".mdt")) {
+                for (CarbonFile table : db.listFiles()) {
+                    tableList.add(new SchemaTableName(db.getName(), table.getName()));
+                }
+            }
+        }
+    }
+
+    private CarbonTable loadTableMetadata(SchemaTableName schemaTableName)
+    {
+        for (SchemaTableName table : tableList) {
+            if (!table.equals(schemaTableName))
+                continue;
+
+            return parseCarbonMetadata(table);
+        }
+        return null;
+    }
+
+    /**
+     * parse carbon metadata into cc(CarbonTableReader cache)
+     **/
+    public CarbonTable parseCarbonMetadata(SchemaTableName table)
+    {
+        CarbonTable result = null;
+        try {
+            //这个应该放在StoreFactory
+            CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
+            if(cache.isValid())
+                return cache.carbonTable;
+
+            //Step1: get table meta path, load carbon table param
+            String storePath = config.getStorePath();
+            cache.carbonTableIdentifier = new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(), UUID.randomUUID().toString());
+            cache.carbonTablePath = PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
+            cc.put(table, cache);
+
+            //Step2: check file existed? read schema file
+            ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+                public TBase create() {
+                    return new org.apache.carbondata.format.TableInfo();
+                }
+            };
+            ThriftReader thriftReader =
+                    new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
+            thriftReader.open();
+            org.apache.carbondata.format.TableInfo tableInfo =
+                    (org.apache.carbondata.format.TableInfo) thriftReader.read();
+            thriftReader.close();
+
+            //Format Level的TableInfo, 需要转换成Code Level的TableInfo
+            SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+            TableInfo wrapperTableInfo = schemaConverter
+                    .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+                            storePath);
+            wrapperTableInfo.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
+            //加载到CarbonMetadata仓库
+            CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+
+            cache.tableInfo = wrapperTableInfo;
+            cache.carbonTable = CarbonMetadata.getInstance().getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
+            result = cache.carbonTable;
+        }catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+
+        return result;
+    }
+
+    public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel, Expression filters) throws Exception {
+
+        //处理filter, 下推filter,将应用在Segment的索引上
+        FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
+
+        AbsoluteTableIdentifier absoluteTableIdentifier = tableCacheModel.carbonTable.getAbsoluteTableIdentifier();
+        CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
+        List<String> invalidSegments = new ArrayList<>();
+        List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+
+        // get all valid segments and set them into the configuration
+        SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(absoluteTableIdentifier);
+        SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+        SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager.getValidAndInvalidSegments();
+
+        tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]);
+        if (segments.getValidSegments().size() == 0) {
+            return new ArrayList<>(0);
+        }
+
+        // remove entry in the segment index if there are invalid segments
+        invalidSegments.addAll(segments.getInvalidSegments());
+        for (String invalidSegmentId : invalidSegments) {
+            invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+        }
+        if (invalidSegments.size() > 0) {
+            List<TableSegmentUniqueIdentifier> invalidSegmentsIds = new ArrayList<>(invalidSegments.size());
+            for(String segId: invalidSegments) {
+                invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId));
+            }
+            cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
+        }
+
+        // get filter for segment
+        CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable);
+        FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
+
+        List<CarbonLocalInputSplit> result = new ArrayList<>();
+        //for each segment fetch blocks matching filter in Driver BTree
+        for (String segmentNo : tableCacheModel.segments) {
+            try{
+                List<DataRefNode> dataRefNodes = getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient, updateStatusManager);
+                for (DataRefNode dataRefNode : dataRefNodes) {
+                    BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
+                    TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
+
+                    if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()), updateStatusManager)) {
+                        continue;
+                    }
+                    result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(),
+                            tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
+                            Arrays.asList(tableBlockInfo.getLocations()), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
+                            tableBlockInfo.getVersion().number()));
+                }
+            }catch (Exception ex){
+                throw new RuntimeException(ex);
+            }
+        }
+        cacheClient.close();
+        return result;
+    }
+
+    /**
+     * get data blocks of given segment
+     */
+    private List<DataRefNode> getDataBlocksOfSegment(FilterExpressionProcessor filterExpressionProcessor,
+                                                     AbsoluteTableIdentifier absoluteTableIdentifier,
+                                                     CarbonTablePath tablePath,
+                                                     FilterResolverIntf resolver,
+                                                     String segmentId,
+                                                     CacheClient cacheClient,
+                                                     SegmentUpdateStatusManager updateStatusManager) throws IOException {
+        //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
+        //QueryStatistic statistic = new QueryStatistic();
+
+        //读取Segment 内部的Index
+        Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+                getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
+                        updateStatusManager);
+
+        List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
+
+        if (null != segmentIndexMap) {
+            // build result
+            for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
+                List<DataRefNode> filterredBlocks;
+                // if no filter is given get all blocks from Btree Index
+                if (null == resolver) {
+                    filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+                } else {
+                    //ignore filter
+                    //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+
+                    // apply filter and get matching blocks
+                    filterredBlocks = filterExpressionProcessor
+                            .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
+                                    absoluteTableIdentifier);
+                }
+                resultFilterredBlocks.addAll(filterredBlocks);
+            }
+        }
+        //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+        //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/);
+        return resultFilterredBlocks;
+    }
+
+    private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
+                                    UpdateVO updateDetails) {
+        if (null != updateDetails.getLatestUpdateTimestamp()
+                && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper
+                .getRefreshedTimeStamp()) {
+            return true;
+        }
+        return false;
+    }
+
+    private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
+                                                                                                AbsoluteTableIdentifier absoluteTableIdentifier,
+                                                                                                CarbonTablePath tablePath,
+                                                                                                String segmentId,
+                                                                                                CacheClient cacheClient,
+                                                                                                SegmentUpdateStatusManager updateStatusManager) throws IOException {
+        Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
+        SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+        boolean isSegmentUpdated = false;
+        Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
+        TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+                new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
+        segmentTaskIndexWrapper =
+                cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
+        UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
+        if (null != segmentTaskIndexWrapper) {
+            segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+            if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
+                taskKeys = segmentIndexMap.keySet();
+                isSegmentUpdated = true;
+            }
+        }
+
+        // if segment tree is not loaded, load the segment tree
+        if (segmentIndexMap == null || isSegmentUpdated) {
+
+            List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
+            List<String> segs = new ArrayList<>();
+            segs.add(segmentId);
+
+            FileSystem fs = getFileStatusOfSegments(new String[]{segmentId}, tablePath, fileStatusList);
+            List<InputSplit> splits = getSplit(fileStatusList, fs);
+
+            List<FileSplit> carbonSplits = new ArrayList<>();
+            for (InputSplit inputSplit : splits) {
+                FileSplit fileSplit = (FileSplit) inputSplit;
+                String segId = CarbonTablePath.DataPathUtil.getSegmentId(fileSplit.getPath().toString());//这里的seperator应该怎么加??
+                if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
+                    continue;
+                }
+                carbonSplits.add(fileSplit);
+            }
+
+            List<TableBlockInfo> tableBlockInfoList  = new ArrayList<>();
+            for (FileSplit inputSplit : carbonSplits) {
+                if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails, updateStatusManager, segmentId)) {
+
+                    BlockletInfos blockletInfos = new BlockletInfos(0, 0, 0);//this level we do not need blocklet info!!!! Is this a trick?
+                    tableBlockInfoList.add(
+                            new TableBlockInfo(inputSplit.getPath().toString(),
+                                    inputSplit.getStart(),
+                                    segmentId,
+                                    inputSplit.getLocations(),
+                                    inputSplit.getLength(),
+                                    blockletInfos,
+                                    ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION),
+                                    null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//这里的null是否会异常?
+                }
+            }
+
+            Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
+            segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
+            // get Btree blocks for given segment
+            tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
+            tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
+            segmentTaskIndexWrapper =
+                    cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
+            segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+        }
+        return segmentIndexMap;
+    }
+
+    private boolean isValidBlockBasedOnUpdateDetails(
+            Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit,
+            UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
+        String taskID = null;
+        if (null != carbonInputSplit) {
+            if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
+                return false;
+            }
+
+            if (null == taskKeys) {
+                return true;
+            }
+
+            taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
+            String bucketNo =
+                    CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
+
+            SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
+                    new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
+
+            String blockTimestamp = carbonInputSplit.getPath().getName()
+                    .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
+                            carbonInputSplit.getPath().getName().lastIndexOf('.'));
+            if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
+                    && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
+                if (!taskKeys.contains(taskBucketHolder)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem)  throws IOException  {
+
+        Iterator split = fileStatusList.iterator();
+
+        List<InputSplit> splits = new ArrayList<>();
+
+        while (true)
+        {
+            while (true)
+            {
+                while(split.hasNext()) {
+                    FileStatus file = (FileStatus) split.next();
+                    Path path = file.getPath();
+                    long length = file.getLen();
+                    if (length != 0L) {
+                        BlockLocation[] blkLocations;
+                        if (file instanceof LocatedFileStatus) {
+                            blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+                        } else {
+                            blkLocations = targetSystem.getFileBlockLocations(file, 0L, length);
+                        }
+
+                        if (this.isSplitable()) {
+                            long blockSize1 = file.getBlockSize();
+                            long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE);
+
+                            long bytesRemaining;
+                            int blkIndex;
+                            for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; bytesRemaining -= splitSize) {
+                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+                                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
+                            }
+
+                            if (bytesRemaining != 0L) {
+                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+                                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts()));
+                            }
+                        }
+                        else
+                        {
+                            splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, blkLocations[0].getHosts()));
+                        }
+                    }
+                    else {
+                        splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, new String[0]));
+                    }
+                }
+                return splits;
+            }
+        }
+
+    }
+
+    private String[] getValidPartitions() {
+        //TODO: has to Identify partitions by partition pruning
+        return new String[] { "0" };
+    }
+
+    private FileSystem getFileStatusOfSegments(String[] segmentsToConsider,
+                                               CarbonTablePath tablePath,
+                                               List<FileStatus> result) throws IOException {
+        String[] partitionsToConsider = getValidPartitions();
+        if (partitionsToConsider.length == 0) {
+            throw new IOException("No partitions/data found");
+        }
+
+        FileSystem fs = null;
+
+        //PathFilter inputFilter = getDataFileFilter(job);
+
+        // get tokens for all the required FileSystem for table path
+        /*TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { tablePath },
+                job.getConfiguration());*/
+
+        //get all data files of valid partitions and segments
+        for (int i = 0; i < partitionsToConsider.length; ++i) {
+            String partition = partitionsToConsider[i];
+
+            for (int j = 0; j < segmentsToConsider.length; ++j) {
+                String segmentId = segmentsToConsider[j];
+                Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
+
+                try{
+                    Configuration conf = new Configuration();
+                    fs = segmentPath.getFileSystem(conf);
+                    //fs.initialize(segmentPath.toUri(), conf);
+
+                    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
+                    while (iter.hasNext()) {
+                        LocatedFileStatus stat = iter.next();
+                        //if(stat.getPath().toString().contains("carbondata"))//参看carbondata的carbonInputFilter的实现
+                        if (DefaultFilter.accept(stat.getPath()))
+                        {
+                            if (stat.isDirectory()) {
+                                addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
+                            } else {
+                                result.add(stat);
+                            }
+                        }
+                    }
+                }catch (Exception ex){
+                    System.out.println(ex.toString());
+                }
+            }
+        }
+        return fs;
+    }
+
+    protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException {
+        RemoteIterator iter = fs.listLocatedStatus(path);
+
+        while(iter.hasNext()) {
+            LocatedFileStatus stat = (LocatedFileStatus)iter.next();
+            if(inputFilter.accept(stat.getPath())) {
+                if(stat.isDirectory()) {
+                    this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+                } else {
+                    result.add(stat);
+                }
+            }
+        }
+
+    }
+
+    /**
+     * get data blocks of given btree
+     */
+    private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
+        List<DataRefNode> blocks = new LinkedList<DataRefNode>();
+        SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
+
+        try {
+            IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+            IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+
+            // Add all blocks of btree into result
+            DataRefNodeFinder blockFinder =
+                    new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
+            DataRefNode startBlock =
+                    blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
+            DataRefNode endBlock =
+                    blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
+            while (startBlock != endBlock) {
+                blocks.add(startBlock);
+                startBlock = startBlock.getNextDataRefNode();
+            }
+            blocks.add(endBlock);
+
+        } catch (KeyGenException e) {
+            System.out.println("Could not generate start key" + e.getMessage());
+        }
+        return blocks;
+    }
+
+    private boolean isSplitable() {
+        try {
+            // Don't split the file if it is local file system
+            if(this.fileType == FileFactory.FileType.LOCAL)
+            {
+                return false;
+            }
+        } catch (Exception e) {
+            return true;
+        }
+        return true;
+    }
+
+    private long computeSplitSize(long blockSize, long minSize,
+                                  long maxSize) {
+        return Math.max(minSize, Math.min(maxSize, blockSize));
+    }
+
+    private FileSplit makeSplit(Path file, long start, long length,
+                                String[] hosts) {
+        return new FileSplit(file, start, length, hosts);
+    }
+
+    private int getBlockIndex(BlockLocation[] blkLocations,
+                              long offset) {
+        for (int i = 0 ; i < blkLocations.length; i++) {
+            // is the offset inside this block?
+            if ((blkLocations[i].getOffset() <= offset) &&
+                    (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
+                return i;
+            }
+        }
+        BlockLocation last = blkLocations[blkLocations.length -1];
+        long fileLength = last.getOffset() + last.getLength() -1;
+        throw new IllegalArgumentException("Offset " + offset +
+                " is outside of file (0.." +
+                fileLength + ")");
+    }
+
+
+    /**
+     * get total number of rows. for count(*)
+     *
+     * @throws IOException
+     * @throws IndexBuilderException
+     */
+    public long getRowCount() throws IOException, IndexBuilderException {
+        long rowCount = 0;
+        /*AbsoluteTableIdentifier absoluteTableIdentifier = this.carbonTable.getAbsoluteTableIdentifier();
+
+        // no of core to load the blocks in driver
+        //addSegmentsIfEmpty(job, absoluteTableIdentifier);
+        int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
+        try {
+            numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+                    .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT));
+        } catch (NumberFormatException e) {
+            numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
+        }
+        // creating a thread pool
+        ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores);
+        List<Future<Map<String, AbstractIndex>>> loadedBlocks =
+                new ArrayList<Future<Map<String, AbstractIndex>>>();
+        //for each segment fetch blocks matching filter in Driver BTree
+        for (String segmentNo : this.segmentList) {
+            // submitting the task
+            loadedBlocks
+                    .add(threadPool.submit(new BlocksLoaderThread(*//*job,*//* absoluteTableIdentifier, segmentNo)));
+        }
+        threadPool.shutdown();
+        try {
+            threadPool.awaitTermination(1, TimeUnit.HOURS);
+        } catch (InterruptedException e) {
+            throw new IndexBuilderException(e);
+        }
+        try {
+            // adding all the rows of the blocks to get the total row
+            // count
+            for (Future<Map<String, AbstractIndex>> block : loadedBlocks) {
+                for (AbstractIndex abstractIndex : block.get().values()) {
+                    rowCount += abstractIndex.getTotalNumberOfRows();
+                }
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IndexBuilderException(e);
+        }*/
+        return rowCount;
+    }
+}


Mime
View raw message