Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1032C200BA7 for ; Tue, 13 Sep 2016 03:31:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0EDCF160AD7; Tue, 13 Sep 2016 01:31:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 23E8F160AE2 for ; Tue, 13 Sep 2016 03:31:49 +0200 (CEST) Received: (qmail 14875 invoked by uid 500); 13 Sep 2016 01:31:49 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 14191 invoked by uid 99); 13 Sep 2016 01:31:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 01:31:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7156DDFB6F; Tue, 13 Sep 2016 01:31:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: adi@apache.org To: commits@drill.apache.org Date: Tue, 13 Sep 2016 01:32:02 -0000 Message-Id: <0121cdfbcdad48d0b1e8fa1d06aaed44@git.apache.org> In-Reply-To: <12bc6f8dff7b420fa5513c35113a14e5@git.apache.org> References: <12bc6f8dff7b420fa5513c35113a14e5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/50] [abbrv] drill git commit: Adding support for Json tables. archived-at: Tue, 13 Sep 2016 01:31:53 -0000 http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java index 57ef0b6..39e45f5 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java @@ -17,14 +17,9 @@ */ package org.apache.drill.exec.store.maprdb; -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; -import org.apache.drill.common.exceptions.DrillRuntimeException; +import java.util.Iterator; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; @@ -34,16 +29,14 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.dfs.FileSystemPlugin; -import org.apache.drill.exec.store.hbase.HBaseSubScan; -import org.apache.drill.exec.store.hbase.HBaseUtils; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.ParseFilter; -import org.apache.hadoop.hbase.util.Bytes; -import java.nio.charset.CharacterCodingException; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; // Class containing information for reading a single HBase region @JsonTypeName("maprdb-sub-scan") @@ -54,32 +47,36 @@ public class MapRDBSubScan extends AbstractBase implements SubScan { public final StoragePluginConfig storage; @JsonIgnore private final FileSystemPlugin fsStoragePlugin; - private final List regionScanSpecList; + private final List regionScanSpecList; private final List columns; + private final String tableType; @JsonCreator public MapRDBSubScan(@JacksonInject StoragePluginRegistry registry, - @JsonProperty("userName") String userName, - @JsonProperty("storage") StoragePluginConfig storage, - @JsonProperty("regionScanSpecList") LinkedList regionScanSpecList, - @JsonProperty("columns") List columns) throws ExecutionSetupException { + @JsonProperty("userName") String userName, + @JsonProperty("storage") StoragePluginConfig storage, + @JsonProperty("regionScanSpecList") List regionScanSpecList, + @JsonProperty("columns") List columns, + @JsonProperty("tableType") String tableType) throws ExecutionSetupException { super(userName); this.fsStoragePlugin = (FileSystemPlugin) registry.getPlugin(storage); this.regionScanSpecList = regionScanSpecList; this.storage = storage; this.columns = columns; + this.tableType = tableType; } - public MapRDBSubScan(String userName, FileSystemPlugin storagePlugin, StoragePluginConfig config, - List hBaseSubScanSpecs, List columns) { - super(userName); - fsStoragePlugin = storagePlugin; - storage = config; - this.regionScanSpecList = hBaseSubScanSpecs; - this.columns = columns; - } + public MapRDBSubScan(String userName, FileSystemPlugin storagePlugin, StoragePluginConfig config, + List maprSubScanSpecs, List columns, String tableType) { + super(userName); + fsStoragePlugin = storagePlugin; + storage = config; + this.regionScanSpecList = maprSubScanSpecs; + this.columns = columns; + this.tableType = tableType; + } - public List getRegionScanSpecList() { + public List getRegionScanSpecList() { return regionScanSpecList; } @@ -100,7 +97,7 @@ public class MapRDBSubScan extends AbstractBase implements SubScan { @Override public PhysicalOperator getNewWithChildren(List children) { Preconditions.checkArgument(children.isEmpty()); - return new MapRDBSubScan(getUserName(), fsStoragePlugin, storage, regionScanSpecList, columns); + return new MapRDBSubScan(getUserName(), fsStoragePlugin, storage, regionScanSpecList, columns, tableType); } @Override @@ -108,121 +105,13 @@ public class MapRDBSubScan extends AbstractBase implements SubScan { return Iterators.emptyIterator(); } - public static class HBaseSubScanSpec { - - protected String tableName; - protected String regionServer; - protected byte[] startRow; - protected byte[] stopRow; - protected byte[] serializedFilter; - - @JsonCreator - public HBaseSubScanSpec(@JsonProperty("tableName") String tableName, - @JsonProperty("regionServer") String regionServer, - @JsonProperty("startRow") byte[] startRow, - @JsonProperty("stopRow") byte[] stopRow, - @JsonProperty("serializedFilter") byte[] serializedFilter, - @JsonProperty("filterString") String filterString) { - if (serializedFilter != null && filterString != null) { - throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time."); - } - this.tableName = tableName; - this.regionServer = regionServer; - this.startRow = startRow; - this.stopRow = stopRow; - if (serializedFilter != null) { - this.serializedFilter = serializedFilter; - } else { - this.serializedFilter = HBaseUtils.serializeFilter(parseFilterString(filterString)); - } - } - - static final ParseFilter PARSE_FILTER = new ParseFilter(); - - static Filter parseFilterString(String filterString) { - if (filterString == null) { - return null; - } - try { - return PARSE_FILTER.parseFilterString(filterString); - } catch (CharacterCodingException e) { - throw new DrillRuntimeException("Error parsing filter string: " + filterString, e); - } - } - - /* package */ HBaseSubScanSpec() { - // empty constructor, to be used with builder pattern; - } - - @JsonIgnore - private Filter scanFilter; - public Filter getScanFilter() { - if (scanFilter == null && serializedFilter != null) { - scanFilter = HBaseUtils.deserializeFilter(serializedFilter); - } - return scanFilter; - } - - public String getTableName() { - return tableName; - } - - public HBaseSubScanSpec setTableName(String tableName) { - this.tableName = tableName; - return this; - } - - public String getRegionServer() { - return regionServer; - } - - public HBaseSubScanSpec setRegionServer(String regionServer) { - this.regionServer = regionServer; - return this; - } - - public byte[] getStartRow() { - return startRow; - } - - public HBaseSubScanSpec setStartRow(byte[] startRow) { - this.startRow = startRow; - return this; - } - - public byte[] getStopRow() { - return stopRow; - } - - public HBaseSubScanSpec setStopRow(byte[] stopRow) { - this.stopRow = stopRow; - return this; - } - - public byte[] getSerializedFilter() { - return serializedFilter; - } - - public HBaseSubScanSpec setSerializedFilter(byte[] serializedFilter) { - this.serializedFilter = serializedFilter; - this.scanFilter = null; - return this; - } - - @Override - public String toString() { - return "HBaseScanSpec [tableName=" + tableName - + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow)) - + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow)) - + ", filter=" + (getScanFilter() == null ? null : getScanFilter().toString()) - + ", regionServer=" + regionServer + "]"; - } - - } - @Override public int getOperatorType() { return 1001; } + public String getTableType() { + return tableType; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java new file mode 100644 index 0000000..5e8d84c --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.maprdb; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.mapr.fs.jni.MapRConstants; +import com.mapr.org.apache.hadoop.hbase.util.Bytes; + +public class MapRDBSubScanSpec { + + protected String tableName; + protected String regionServer; + protected byte[] startRow; + protected byte[] stopRow; + protected byte[] serializedFilter; + + @parquet.org.codehaus.jackson.annotate.JsonCreator + public MapRDBSubScanSpec(@JsonProperty("tableName") String tableName, + @JsonProperty("regionServer") String regionServer, + @JsonProperty("startRow") byte[] startRow, + @JsonProperty("stopRow") byte[] stopRow, + @JsonProperty("serializedFilter") byte[] serializedFilter, + @JsonProperty("filterString") String filterString) { + if (serializedFilter != null && filterString != null) { + throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time."); + } + this.tableName = tableName; + this.regionServer = regionServer; + this.startRow = startRow; + this.stopRow = stopRow; + this.serializedFilter = serializedFilter; + } + + /* package */ MapRDBSubScanSpec() { + // empty constructor, to be used with builder pattern; + } + + public String getTableName() { + return tableName; + } + + public MapRDBSubScanSpec setTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public String getRegionServer() { + return regionServer; + } + + public MapRDBSubScanSpec setRegionServer(String regionServer) { + this.regionServer = regionServer; + return this; + } + + /** + * @return the raw (not-encoded) start row key for this sub-scan + */ + public byte[] getStartRow() { + return startRow == null ? MapRConstants.EMPTY_BYTE_ARRAY: startRow; + } + + public MapRDBSubScanSpec setStartRow(byte[] startRow) { + this.startRow = startRow; + return this; + } + + /** + * @return the raw (not-encoded) stop row key for this sub-scan + */ + public byte[] getStopRow() { + return stopRow == null ? MapRConstants.EMPTY_BYTE_ARRAY : stopRow; + } + + public MapRDBSubScanSpec setStopRow(byte[] stopRow) { + this.stopRow = stopRow; + return this; + } + + public byte[] getSerializedFilter() { + return serializedFilter; + } + + public MapRDBSubScanSpec setSerializedFilter(byte[] serializedFilter) { + this.serializedFilter = serializedFilter; + return this; + } + + @Override + public String toString() { + return "MapRDBSubScanSpec [tableName=" + tableName + + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow)) + + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow)) + + ", filter=" + (getSerializedFilter() == null ? null : Bytes.toBase64(getSerializedFilter())) + + ", regionServer=" + regionServer + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java index 3d5320e..d2b1453 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java @@ -18,26 +18,25 @@ package org.apache.drill.exec.store.maprdb; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.mapr.TableMappingRulesFactory; import com.mapr.fs.hbase.HBaseAdminImpl; public class MapRDBTableStats { + private static volatile HBaseAdminImpl admin = null; private long numRows; - private volatile HBaseAdminImpl admin = null; - public MapRDBTableStats(HTable table) throws Exception { + public MapRDBTableStats(Configuration conf, String tablePath) throws Exception { if (admin == null) { synchronized (MapRDBTableStats.class) { if (admin == null) { - Configuration config = table.getConfiguration(); - admin = new HBaseAdminImpl(config, TableMappingRulesFactory.create(config)); + Configuration config = conf; + admin = new HBaseAdminImpl(config, TableMappingRulesFactory.create(conf)); } } } - numRows = admin.getNumRows(new String(table.getTableName())); + numRows = admin.getNumRows(tablePath); } public long getNumRows() { http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java new file mode 100644 index 0000000..389f00d --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.maprdb; + +import org.apache.hadoop.hbase.HRegionInfo; + +import com.mapr.db.impl.TabletInfoImpl; + +public class TabletFragmentInfo implements Comparable { + + final private HRegionInfo regionInfo; + final private TabletInfoImpl tabletInfoImpl; + + public TabletFragmentInfo(HRegionInfo regionInfo) { + this(null, regionInfo); + } + + public TabletFragmentInfo(TabletInfoImpl tabletInfoImpl) { + this(tabletInfoImpl, null); + } + + TabletFragmentInfo(TabletInfoImpl tabletInfoImpl, HRegionInfo regionInfo) { + this.regionInfo = regionInfo; + this.tabletInfoImpl = tabletInfoImpl; + } + + public HRegionInfo getRegionInfo() { + return regionInfo; + } + + public TabletInfoImpl getTabletInfoImpl() { + return tabletInfoImpl; + } + + public boolean containsRow(byte[] row) { + return tabletInfoImpl != null ? tabletInfoImpl.containsRow(row) : + regionInfo.containsRow(row); + } + + public byte[] getStartKey() { + return tabletInfoImpl != null ? tabletInfoImpl.getStartRow() : + regionInfo.getStartKey(); + } + + public byte[] getEndKey() { + return tabletInfoImpl != null ? tabletInfoImpl.getStopRow() : + regionInfo.getEndKey(); + } + + @Override + public int compareTo(TabletFragmentInfo o) { + return tabletInfoImpl != null ? tabletInfoImpl.compareTo(o.tabletInfoImpl) : + regionInfo.compareTo(o.regionInfo); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((regionInfo == null) ? 0 : regionInfo.hashCode()); + result = prime * result + ((tabletInfoImpl == null) ? 0 : tabletInfoImpl.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TabletFragmentInfo other = (TabletFragmentInfo) obj; + if (regionInfo == null) { + if (other.regionInfo != null) + return false; + } else if (!regionInfo.equals(other.regionInfo)) + return false; + if (tabletInfoImpl == null) { + if (other.tabletInfoImpl != null) + return false; + } else if (!tabletInfoImpl.equals(other.tabletInfoImpl)) + return false; + return true; + } + + @Override + public String toString() { + return "TabletFragmentInfo [regionInfo=" + regionInfo + ", tabletInfoImpl=" + tabletInfoImpl + + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/BinaryTableGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/BinaryTableGroupScan.java new file mode 100644 index 0000000..69fda9c --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/BinaryTableGroupScan.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.maprdb.binary; + +import java.io.IOException; +import java.util.List; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.hbase.DrillHBaseConstants; +import org.apache.drill.exec.store.hbase.HBaseScanSpec; +import org.apache.drill.exec.store.maprdb.MapRDBFormatPlugin; +import org.apache.drill.exec.store.maprdb.MapRDBFormatPluginConfig; +import org.apache.drill.exec.store.maprdb.MapRDBGroupScan; +import org.apache.drill.exec.store.maprdb.MapRDBSubScan; +import org.apache.drill.exec.store.maprdb.MapRDBSubScanSpec; +import org.apache.drill.exec.store.maprdb.MapRDBTableStats; +import org.apache.drill.exec.store.maprdb.TabletFragmentInfo; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HTable; +import org.codehaus.jackson.annotate.JsonCreator; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; + +@JsonTypeName("maprdb-binary-scan") +public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseConstants { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BinaryTableGroupScan.class); + + public static final String TABLE_BINARY = "binary"; + + private HBaseScanSpec hbaseScanSpec; + + private HTableDescriptor hTableDesc; + + private MapRDBTableStats tableStats; + + @JsonCreator + public BinaryTableGroupScan(@JsonProperty("userName") final String userName, + @JsonProperty("hbaseScanSpec") HBaseScanSpec scanSpec, + @JsonProperty("storage") FileSystemConfig storagePluginConfig, + @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig, + @JsonProperty("columns") List columns, + @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { + this (userName, + (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig), + (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig), + scanSpec, columns); + } + + public BinaryTableGroupScan(String userName, FileSystemPlugin storagePlugin, + MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, List columns) { + super(storagePlugin, formatPlugin, columns, userName); + this.hbaseScanSpec = scanSpec; + init(); + } + + /** + * Private constructor, used for cloning. + * @param that The HBaseGroupScan to clone + */ + private BinaryTableGroupScan(BinaryTableGroupScan that) { + super(that); + this.hbaseScanSpec = that.hbaseScanSpec; + this.endpointFragmentMapping = that.endpointFragmentMapping; + this.hTableDesc = that.hTableDesc; + this.tableStats = that.tableStats; + } + + @Override + public GroupScan clone(List columns) { + BinaryTableGroupScan newScan = new BinaryTableGroupScan(this); + newScan.columns = columns; + newScan.verifyColumns(); + return newScan; + } + + private void init() { + logger.debug("Getting region locations"); + try { + Configuration conf = HBaseConfiguration.create(); + HTable table = new HTable(conf, hbaseScanSpec.getTableName()); + tableStats = new MapRDBTableStats(conf, hbaseScanSpec.getTableName()); + this.hTableDesc = table.getTableDescriptor(); + NavigableMap regionsMap = table.getRegionLocations(); + table.close(); + + boolean foundStartRegion = false; + regionsToScan = new TreeMap(); + for (Entry mapEntry : regionsMap.entrySet()) { + HRegionInfo regionInfo = mapEntry.getKey(); + if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) { + continue; + } + foundStartRegion = true; + regionsToScan.put(new TabletFragmentInfo(regionInfo), mapEntry.getValue().getHostname()); + if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) { + break; + } + } + } catch (Exception e) { + throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e); + } + verifyColumns(); + } + + private void verifyColumns() { + /* + if (columns != null) { + for (SchemaPath column : columns) { + if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) { + DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .", + column.getRootSegment().getPath(), hTableDesc.getNameAsString()); + } + } + } + */ + } + + protected MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) { + HBaseScanSpec spec = hbaseScanSpec; + MapRDBSubScanSpec subScanSpec = new MapRDBSubScanSpec( + spec.getTableName(), + regionsToScan.get(tfi), + (!isNullOrEmpty(spec.getStartRow()) && tfi.containsRow(spec.getStartRow())) ? spec.getStartRow() : tfi.getStartKey(), + (!isNullOrEmpty(spec.getStopRow()) && tfi.containsRow(spec.getStopRow())) ? spec.getStopRow() : tfi.getEndKey(), + spec.getSerializedFilter(), + null); + return subScanSpec; + } + + private boolean isNullOrEmpty(byte[] key) { + return key == null || key.length == 0; + } + + @Override + public MapRDBSubScan getSpecificScan(int minorFragmentId) { + assert minorFragmentId < endpointFragmentMapping.size() : String.format( + "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(), + minorFragmentId); + return new MapRDBSubScan(getUserName(), getStoragePlugin(), getStoragePlugin().getConfig(), + endpointFragmentMapping.get(minorFragmentId), columns, TABLE_BINARY); + } + + @Override + public ScanStats getScanStats() { + //TODO: look at stats for this. + long rowCount = (long) ((hbaseScanSpec.getFilter() != null ? .5 : 1) * tableStats.getNumRows()); + int avgColumnSize = 10; + int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size(); + return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount); + } + + @Override + @JsonIgnore + public PhysicalOperator getNewWithChildren(List children) { + Preconditions.checkArgument(children.isEmpty()); + return new BinaryTableGroupScan(this); + } + + @JsonIgnore + public Configuration getHBaseConf() { + return HBaseConfiguration.create(); + } + + @JsonIgnore + public String getTableName() { + return getHBaseScanSpec().getTableName(); + } + + @Override + public String toString() { + return "BinaryTableGroupScan [ScanSpec=" + + hbaseScanSpec + ", columns=" + + columns + "]"; + } + + @JsonProperty + public HBaseScanSpec getHBaseScanSpec() { + return hbaseScanSpec; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/CompareFunctionsProcessor.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/CompareFunctionsProcessor.java new file mode 100644 index 0000000..f06786d --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/CompareFunctionsProcessor.java @@ -0,0 +1,547 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.maprdb.binary; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import org.apache.drill.common.expression.CastExpression; +import org.apache.drill.common.expression.ConvertExpression; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ValueExpressions.BooleanExpression; +import org.apache.drill.common.expression.ValueExpressions.DateExpression; +import org.apache.drill.common.expression.ValueExpressions.DoubleExpression; +import org.apache.drill.common.expression.ValueExpressions.FloatExpression; +import org.apache.drill.common.expression.ValueExpressions.IntExpression; +import org.apache.drill.common.expression.ValueExpressions.LongExpression; +import org.apache.drill.common.expression.ValueExpressions.QuotedString; +import org.apache.drill.common.expression.ValueExpressions.TimeExpression; +import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.hadoop.hbase.util.Order; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedByteRange; + +import org.apache.drill.exec.store.hbase.DrillHBaseConstants; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.PrefixFilter; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +class CompareFunctionsProcessor extends AbstractExprVisitor { + private byte[] value; + private boolean success; + private boolean isEqualityFn; + private SchemaPath path; + private String functionName; + private boolean sortOrderAscending; + + // Fields for row-key prefix comparison + // If the query is on row-key prefix, we cannot use a standard template to identify startRow, stopRow and filter + // Hence, we use these local variables(set depending upon the encoding type in user query) + private boolean isRowKeyPrefixComparison; + byte[] rowKeyPrefixStartRow; + byte[] rowKeyPrefixStopRow; + Filter rowKeyPrefixFilter; + + public static boolean isCompareFunction(String functionName) { + return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName); + } + + public static CompareFunctionsProcessor process(FunctionCall call, boolean nullComparatorSupported) { + String functionName = call.getName(); + LogicalExpression nameArg = call.args.get(0); + LogicalExpression valueArg = call.args.size() >= 2 ? call.args.get(1) : null; + CompareFunctionsProcessor evaluator = new CompareFunctionsProcessor(functionName); + + if (valueArg != null) { // binary function + if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) { + LogicalExpression swapArg = valueArg; + valueArg = nameArg; + nameArg = swapArg; + evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName); + } + evaluator.success = nameArg.accept(evaluator, valueArg); + } else if (nullComparatorSupported && call.args.get(0) instanceof SchemaPath) { + evaluator.success = true; + evaluator.path = (SchemaPath) nameArg; + } + + return evaluator; + } + + public CompareFunctionsProcessor(String functionName) { + this.success = false; + this.functionName = functionName; + this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName) + && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(functionName); + this.isRowKeyPrefixComparison = false; + this.sortOrderAscending = true; + } + + public byte[] getValue() { + return value; + } + + public boolean isSuccess() { + return success; + } + + public SchemaPath getPath() { + return path; + } + + public String getFunctionName() { + return functionName; + } + + public boolean isRowKeyPrefixComparison() { + return isRowKeyPrefixComparison; + } + + public byte[] getRowKeyPrefixStartRow() { + return rowKeyPrefixStartRow; + } + + public byte[] getRowKeyPrefixStopRow() { + return rowKeyPrefixStopRow; + } + + public Filter getRowKeyPrefixFilter() { + return rowKeyPrefixFilter; + } + + public boolean isSortOrderAscending() { + return sortOrderAscending; + } + + @Override + public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException { + if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) { + return e.getInput().accept(this, valueArg); + } + return false; + } + + @Override + public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression valueArg) throws RuntimeException { + if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM) { + + String encodingType = e.getEncodingType(); + int prefixLength = 0; + + // Handle scan pruning in the following scenario: + // The row-key is a composite key and the CONVERT_FROM() function has byte_substr() as input function which is + // querying for the first few bytes of the row-key(start-offset 1) + // Example WHERE clause: + // CONVERT_FROM(BYTE_SUBSTR(row_key, 1, 8), 'DATE_EPOCH_BE') < DATE '2015-06-17' + if (e.getInput() instanceof FunctionCall) { + + // We can prune scan range only for big-endian encoded data + if (encodingType.endsWith("_BE") == false) { + return false; + } + + FunctionCall call = (FunctionCall)e.getInput(); + String functionName = call.getName(); + if (!functionName.equalsIgnoreCase("byte_substr")) { + return false; + } + + LogicalExpression nameArg = call.args.get(0); + LogicalExpression valueArg1 = call.args.size() >= 2 ? call.args.get(1) : null; + LogicalExpression valueArg2 = call.args.size() >= 3 ? call.args.get(2) : null; + + if (((nameArg instanceof SchemaPath) == false) || + (valueArg1 == null) || ((valueArg1 instanceof IntExpression) == false) || + (valueArg2 == null) || ((valueArg2 instanceof IntExpression) == false)) { + return false; + } + + boolean isRowKey = ((SchemaPath)nameArg).getAsUnescapedPath().equals(DrillHBaseConstants.ROW_KEY); + int offset = ((IntExpression)valueArg1).getInt(); + + if (!isRowKey || (offset != 1)) { + return false; + } + + this.path = (SchemaPath)nameArg; + prefixLength = ((IntExpression)valueArg2).getInt(); + this.isRowKeyPrefixComparison = true; + return visitRowKeyPrefixConvertExpression(e, prefixLength, valueArg); + } + + if (e.getInput() instanceof SchemaPath) { + ByteBuf bb = null; + + switch (encodingType) { + case "INT_BE": + case "INT": + case "UINT_BE": + case "UINT": + case "UINT4_BE": + case "UINT4": + if (valueArg instanceof IntExpression + && (isEqualityFn || encodingType.startsWith("U"))) { + bb = newByteBuf(4, encodingType.endsWith("_BE")); + bb.writeInt(((IntExpression)valueArg).getInt()); + } + break; + case "BIGINT_BE": + case "BIGINT": + case "UINT8_BE": + case "UINT8": + if (valueArg instanceof LongExpression + && (isEqualityFn || encodingType.startsWith("U"))) { + bb = newByteBuf(8, encodingType.endsWith("_BE")); + bb.writeLong(((LongExpression)valueArg).getLong()); + } + break; + case "FLOAT": + if (valueArg instanceof FloatExpression && isEqualityFn) { + bb = newByteBuf(4, true); + bb.writeFloat(((FloatExpression)valueArg).getFloat()); + } + break; + case "DOUBLE": + if (valueArg instanceof DoubleExpression && isEqualityFn) { + bb = newByteBuf(8, true); + bb.writeDouble(((DoubleExpression)valueArg).getDouble()); + } + break; + case "TIME_EPOCH": + case "TIME_EPOCH_BE": + if (valueArg instanceof TimeExpression) { + bb = newByteBuf(8, encodingType.endsWith("_BE")); + bb.writeLong(((TimeExpression)valueArg).getTime()); + } + break; + case "DATE_EPOCH": + case "DATE_EPOCH_BE": + if (valueArg instanceof DateExpression) { + bb = newByteBuf(8, encodingType.endsWith("_BE")); + bb.writeLong(((DateExpression)valueArg).getDate()); + } + break; + case "BOOLEAN_BYTE": + if (valueArg instanceof BooleanExpression) { + bb = newByteBuf(1, false /* does not matter */); + bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0); + } + break; + case "DOUBLE_OB": + case "DOUBLE_OBD": + if (valueArg instanceof DoubleExpression) { + bb = newByteBuf(9, true); + PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9); + if (encodingType.endsWith("_OBD")) { + org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br, + ((DoubleExpression)valueArg).getDouble(), Order.DESCENDING); + this.sortOrderAscending = false; + } else { + org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br, + ((DoubleExpression)valueArg).getDouble(), Order.ASCENDING); + } + } + break; + case "FLOAT_OB": + case "FLOAT_OBD": + if (valueArg instanceof FloatExpression) { + bb = newByteBuf(5, true); + PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5); + if (encodingType.endsWith("_OBD")) { + org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br, + ((FloatExpression)valueArg).getFloat(), Order.DESCENDING); + this.sortOrderAscending = false; + } else { + org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br, + ((FloatExpression)valueArg).getFloat(), Order.ASCENDING); + } + } + break; + case "BIGINT_OB": + case "BIGINT_OBD": + if (valueArg instanceof LongExpression) { + bb = newByteBuf(9, true); + PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9); + if (encodingType.endsWith("_OBD")) { + org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br, + ((LongExpression)valueArg).getLong(), Order.DESCENDING); + this.sortOrderAscending = false; + } else { + org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br, + ((LongExpression)valueArg).getLong(), Order.ASCENDING); + } + } + break; + case "INT_OB": + case "INT_OBD": + if (valueArg instanceof IntExpression) { + bb = newByteBuf(5, true); + PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5); + if (encodingType.endsWith("_OBD")) { + org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br, + ((IntExpression)valueArg).getInt(), Order.DESCENDING); + this.sortOrderAscending = false; + } else { + org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br, + ((IntExpression)valueArg).getInt(), Order.ASCENDING); + } + } + break; + case "UTF8_OB": + case "UTF8_OBD": + if (valueArg instanceof QuotedString) { + int stringLen = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8).length; + bb = newByteBuf(stringLen + 2, true); + PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, stringLen + 2); + if (encodingType.endsWith("_OBD")) { + org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br, + ((QuotedString)valueArg).value, Order.DESCENDING); + this.sortOrderAscending = false; + } else { + org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br, + ((QuotedString)valueArg).value, Order.ASCENDING); + } + } + break; + case "UTF8": + // let visitSchemaPath() handle this. + return e.getInput().accept(this, valueArg); + } + + if (bb != null) { + this.value = bb.array(); + this.path = (SchemaPath)e.getInput(); + return true; + } + } + } + return false; + } + + private Boolean visitRowKeyPrefixConvertExpression(ConvertExpression e, + int prefixLength, LogicalExpression valueArg) { + String encodingType = e.getEncodingType(); + rowKeyPrefixStartRow = HConstants.EMPTY_START_ROW; + rowKeyPrefixStopRow = HConstants.EMPTY_START_ROW; + rowKeyPrefixFilter = null; + + if ((encodingType.compareTo("UINT4_BE") == 0) || + (encodingType.compareTo("UINT_BE") == 0)) { + if (prefixLength != 4) { + throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix"); + } + + int val; + if ((valueArg instanceof IntExpression) == false) { + return false; + } + + val = ((IntExpression)valueArg).getInt(); + + // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >= + switch (functionName) { + case "equal": + rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(4).putInt(val).array()); + rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array(); + rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array(); + return true; + case "greater_than_or_equal_to": + rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array(); + return true; + case "greater_than": + rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val + 1).array(); + return true; + case "less_than_or_equal_to": + rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array(); + return true; + case "less_than": + rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val).array(); + return true; + } + + return false; + } + + if ((encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) || + (encodingType.compareTo("TIME_EPOCH_BE") == 0) || + (encodingType.compareTo("UINT8_BE") == 0)) { + + if (prefixLength != 8) { + throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix"); + } + + long val; + if (encodingType.compareTo("TIME_EPOCH_BE") == 0) { + if ((valueArg instanceof TimeExpression) == false) { + return false; + } + + val = ((TimeExpression)valueArg).getTime(); + } else if (encodingType.compareTo("UINT8_BE") == 0){ + if ((valueArg instanceof LongExpression) == false) { + return false; + } + + val = ((LongExpression)valueArg).getLong(); + } else if (encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) { + if ((valueArg instanceof TimeStampExpression) == false) { + return false; + } + + val = ((TimeStampExpression)valueArg).getTimeStamp(); + } else { + // Should not reach here. + return false; + } + + // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >= + switch (functionName) { + case "equal": + rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(8).putLong(val).array()); + rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array(); + rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array(); + return true; + case "greater_than_or_equal_to": + rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array(); + return true; + case "greater_than": + rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val + 1).array(); + return true; + case "less_than_or_equal_to": + rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array(); + return true; + case "less_than": + rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val).array(); + return true; + } + + return false; + } + + if (encodingType.compareTo("DATE_EPOCH_BE") == 0) { + if ((valueArg instanceof DateExpression) == false) { + return false; + } + + if (prefixLength != 8) { + throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix"); + } + + final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24; + long dateToSet; + // For DATE encoding, the operators that we push-down are =, <>, <, <=, >, >= + switch (functionName) { + case "equal": + long startDate = ((DateExpression)valueArg).getDate(); + rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(startDate).array(); + long stopDate = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY; + rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(stopDate).array(); + return true; + case "greater_than_or_equal_to": + dateToSet = ((DateExpression)valueArg).getDate(); + rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array(); + return true; + case "greater_than": + dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY; + rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array(); + return true; + case "less_than_or_equal_to": + dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY; + rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array(); + return true; + case "less_than": + dateToSet = ((DateExpression)valueArg).getDate(); + rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array(); + return true; + } + + return false; + } + + return false; + } + + @Override + public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException { + return false; + } + + @Override + public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException { + if (valueArg instanceof QuotedString) { + this.value = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8); + this.path = path; + return true; + } + return false; + } + + private static ByteBuf newByteBuf(int size, boolean bigEndian) { + return Unpooled.wrappedBuffer(new byte[size]) + .order(bigEndian ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN) + .writerIndex(0); + } + + private static final ImmutableSet> VALUE_EXPRESSION_CLASSES; + static { + ImmutableSet.Builder> builder = ImmutableSet.builder(); + VALUE_EXPRESSION_CLASSES = builder + .add(BooleanExpression.class) + .add(DateExpression.class) + .add(DoubleExpression.class) + .add(FloatExpression.class) + .add(IntExpression.class) + .add(LongExpression.class) + .add(QuotedString.class) + .add(TimeExpression.class) + .build(); + } + + private static final ImmutableMap COMPARE_FUNCTIONS_TRANSPOSE_MAP; + static { + ImmutableMap.Builder builder = ImmutableMap.builder(); + COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder + // unary functions + .put("isnotnull", "isnotnull") + .put("isNotNull", "isNotNull") + .put("is not null", "is not null") + .put("isnull", "isnull") + .put("isNull", "isNull") + .put("is null", "is null") + // binary functions + .put("like", "like") + .put("equal", "equal") + .put("not_equal", "not_equal") + .put("greater_than_or_equal_to", "less_than_or_equal_to") + .put("greater_than", "less_than") + .put("less_than_or_equal_to", "greater_than_or_equal_to") + .put("less_than", "greater_than") + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBFilterBuilder.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBFilterBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBFilterBuilder.java new file mode 100644 index 0000000..800d155 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBFilterBuilder.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.maprdb.binary; + +import java.util.Arrays; + +import org.apache.drill.common.expression.BooleanOperator; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.drill.exec.store.hbase.DrillHBaseConstants; +import org.apache.drill.exec.store.hbase.HBaseRegexParser; +import org.apache.drill.exec.store.hbase.HBaseScanSpec; +import org.apache.drill.exec.store.hbase.HBaseUtils; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.NullComparator; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; + +public class MapRDBFilterBuilder extends AbstractExprVisitor implements DrillHBaseConstants { + + final private BinaryTableGroupScan groupScan; + + final private LogicalExpression le; + + private boolean allExpressionsConverted = true; + + private static Boolean nullComparatorSupported = null; + + MapRDBFilterBuilder(BinaryTableGroupScan groupScan, LogicalExpression le) { + this.groupScan = groupScan; + this.le = le; + } + + public HBaseScanSpec parseTree() { + HBaseScanSpec parsedSpec = le.accept(this, null); + if (parsedSpec != null) { + parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), parsedSpec); + /* + * If RowFilter is THE filter attached to the scan specification, + * remove it since its effect is also achieved through startRow and stopRow. + */ + Filter filter = parsedSpec.getFilter(); + if (filter instanceof RowFilter && + ((RowFilter)filter).getOperator() != CompareOp.NOT_EQUAL && + ((RowFilter)filter).getComparator() instanceof BinaryComparator) { + parsedSpec = new HBaseScanSpec(parsedSpec.getTableName(), parsedSpec.getStartRow(), parsedSpec.getStopRow(), null); + } + } + return parsedSpec; + } + + public boolean isAllExpressionsConverted() { + return allExpressionsConverted; + } + + @Override + public HBaseScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException { + allExpressionsConverted = false; + return null; + } + + @Override + public HBaseScanSpec visitBooleanOperator(BooleanOperator op, Void value) throws RuntimeException { + return visitFunctionCall(op, value); + } + + @Override + public HBaseScanSpec visitFunctionCall(FunctionCall call, Void value) throws RuntimeException { + HBaseScanSpec nodeScanSpec = null; + String functionName = call.getName(); + ImmutableList args = call.args; + + if (CompareFunctionsProcessor.isCompareFunction(functionName)) { + /* + * HBASE-10848: Bug in HBase versions (0.94.[0-18], 0.96.[0-2], 0.98.[0-1]) + * causes a filter with NullComparator to fail. Enable only if specified in + * the configuration (after ensuring that the HBase cluster has the fix). + */ + if (nullComparatorSupported == null) { + nullComparatorSupported = groupScan.getHBaseConf().getBoolean("drill.hbase.supports.null.comparator", false); + } + + CompareFunctionsProcessor processor = CompareFunctionsProcessor.process(call, nullComparatorSupported); + if (processor.isSuccess()) { + nodeScanSpec = createHBaseScanSpec(call, processor); + } + } else { + switch (functionName) { + case "booleanAnd": + case "booleanOr": + HBaseScanSpec firstScanSpec = args.get(0).accept(this, null); + for (int i = 1; i < args.size(); ++i) { + HBaseScanSpec nextScanSpec = args.get(i).accept(this, null); + if (firstScanSpec != null && nextScanSpec != null) { + nodeScanSpec = mergeScanSpecs(functionName, firstScanSpec, nextScanSpec); + } else { + allExpressionsConverted = false; + if ("booleanAnd".equals(functionName)) { + nodeScanSpec = firstScanSpec == null ? nextScanSpec : firstScanSpec; + } + } + firstScanSpec = nodeScanSpec; + } + break; + } + } + + if (nodeScanSpec == null) { + allExpressionsConverted = false; + } + + return nodeScanSpec; + } + + private HBaseScanSpec mergeScanSpecs(String functionName, HBaseScanSpec leftScanSpec, HBaseScanSpec rightScanSpec) { + Filter newFilter = null; + byte[] startRow = HConstants.EMPTY_START_ROW; + byte[] stopRow = HConstants.EMPTY_END_ROW; + + switch (functionName) { + case "booleanAnd": + newFilter = HBaseUtils.andFilterAtIndex(leftScanSpec.getFilter(), -1, rightScanSpec.getFilter()); //HBaseUtils.LAST_FILTER + startRow = HBaseUtils.maxOfStartRows(leftScanSpec.getStartRow(), rightScanSpec.getStartRow()); + stopRow = HBaseUtils.minOfStopRows(leftScanSpec.getStopRow(), rightScanSpec.getStopRow()); + break; + case "booleanOr": + newFilter = HBaseUtils.orFilterAtIndex(leftScanSpec.getFilter(), -1, rightScanSpec.getFilter()); //HBaseUtils.LAST_FILTER + startRow = HBaseUtils.minOfStartRows(leftScanSpec.getStartRow(), rightScanSpec.getStartRow()); + stopRow = HBaseUtils.maxOfStopRows(leftScanSpec.getStopRow(), rightScanSpec.getStopRow()); + } + return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, newFilter); + } + + private HBaseScanSpec createHBaseScanSpec(FunctionCall call, CompareFunctionsProcessor processor) { + String functionName = processor.getFunctionName(); + SchemaPath field = processor.getPath(); + byte[] fieldValue = processor.getValue(); + boolean sortOrderAscending = processor.isSortOrderAscending(); + boolean isRowKey = field.getAsUnescapedPath().equals(ROW_KEY); + if (!(isRowKey + || (!field.getRootSegment().isLastPath() + && field.getRootSegment().getChild().isLastPath() + && field.getRootSegment().getChild().isNamed()) + ) + ) { + /* + * if the field in this function is neither the row_key nor a qualified HBase column, return. + */ + return null; + } + + if (processor.isRowKeyPrefixComparison()) { + return createRowKeyPrefixScanSpec(call, processor); + } + + CompareOp compareOp = null; + boolean isNullTest = false; + ByteArrayComparable comparator = new BinaryComparator(fieldValue); + byte[] startRow = HConstants.EMPTY_START_ROW; + byte[] stopRow = HConstants.EMPTY_END_ROW; + switch (functionName) { + case "equal": + compareOp = CompareOp.EQUAL; + if (isRowKey) { + startRow = fieldValue; + /* stopRow should be just greater than 'value'*/ + stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1); + compareOp = CompareOp.EQUAL; + } + break; + case "not_equal": + compareOp = CompareOp.NOT_EQUAL; + break; + case "greater_than_or_equal_to": + if (sortOrderAscending) { + compareOp = CompareOp.GREATER_OR_EQUAL; + if (isRowKey) { + startRow = fieldValue; + } + } else { + compareOp = CompareOp.LESS_OR_EQUAL; + if (isRowKey) { + // stopRow should be just greater than 'value' + stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1); + } + } + break; + case "greater_than": + if (sortOrderAscending) { + compareOp = CompareOp.GREATER; + if (isRowKey) { + // startRow should be just greater than 'value' + startRow = Arrays.copyOf(fieldValue, fieldValue.length+1); + } + } else { + compareOp = CompareOp.LESS; + if (isRowKey) { + stopRow = fieldValue; + } + } + break; + case "less_than_or_equal_to": + if (sortOrderAscending) { + compareOp = CompareOp.LESS_OR_EQUAL; + if (isRowKey) { + // stopRow should be just greater than 'value' + stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1); + } + } else { + compareOp = CompareOp.GREATER_OR_EQUAL; + if (isRowKey) { + startRow = fieldValue; + } + } + break; + case "less_than": + if (sortOrderAscending) { + compareOp = CompareOp.LESS; + if (isRowKey) { + stopRow = fieldValue; + } + } else { + compareOp = CompareOp.GREATER; + if (isRowKey) { + // startRow should be just greater than 'value' + startRow = Arrays.copyOf(fieldValue, fieldValue.length+1); + } + } + break; + case "isnull": + case "isNull": + case "is null": + if (isRowKey) { + return null; + } + isNullTest = true; + compareOp = CompareOp.EQUAL; + comparator = new NullComparator(); + break; + case "isnotnull": + case "isNotNull": + case "is not null": + if (isRowKey) { + return null; + } + compareOp = CompareOp.NOT_EQUAL; + comparator = new NullComparator(); + break; + case "like": + /* + * Convert the LIKE operand to Regular Expression pattern so that we can + * apply RegexStringComparator() + */ + HBaseRegexParser parser = new HBaseRegexParser(call).parse(); + compareOp = CompareOp.EQUAL; + comparator = new RegexStringComparator(parser.getRegexString()); + + /* + * We can possibly do better if the LIKE operator is on the row_key + */ + if (isRowKey) { + String prefix = parser.getPrefixString(); + if (prefix != null) { // group 3 is literal + /* + * If there is a literal prefix, it can help us prune the scan to a sub range + */ + if (prefix.equals(parser.getLikeString())) { + /* The operand value is literal. This turns the LIKE operator to EQUAL operator */ + startRow = stopRow = fieldValue; + compareOp = null; + } else { + startRow = prefix.getBytes(Charsets.UTF_8); + stopRow = startRow.clone(); + boolean isMaxVal = true; + for (int i = stopRow.length - 1; i >= 0 ; --i) { + int nextByteValue = (0xff & stopRow[i]) + 1; + if (nextByteValue < 0xff) { + stopRow[i] = (byte) nextByteValue; + isMaxVal = false; + break; + } else { + stopRow[i] = 0; + } + } + if (isMaxVal) { + stopRow = HConstants.EMPTY_END_ROW; + } + } + } + } + break; + } + + if (compareOp != null || startRow != HConstants.EMPTY_START_ROW || stopRow != HConstants.EMPTY_END_ROW) { + Filter filter = null; + if (isRowKey) { + if (compareOp != null) { + filter = new RowFilter(compareOp, comparator); + } + } else { + byte[] family = HBaseUtils.getBytes(field.getRootSegment().getPath()); + byte[] qualifier = HBaseUtils.getBytes(field.getRootSegment().getChild().getNameSegment().getPath()); + filter = new SingleColumnValueFilter(family, qualifier, compareOp, comparator); + ((SingleColumnValueFilter)filter).setLatestVersionOnly(true); + if (!isNullTest) { + ((SingleColumnValueFilter)filter).setFilterIfMissing(true); + } + } + return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter); + } + // else + return null; + } + + private HBaseScanSpec createRowKeyPrefixScanSpec(FunctionCall call, + CompareFunctionsProcessor processor) { + byte[] startRow = processor.getRowKeyPrefixStartRow(); + byte[] stopRow = processor.getRowKeyPrefixStopRow(); + Filter filter = processor.getRowKeyPrefixFilter(); + + if (startRow != HConstants.EMPTY_START_ROW || + stopRow != HConstants.EMPTY_END_ROW || + filter != null) { + return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter); + } + + // else + return null; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBPushFilterIntoScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBPushFilterIntoScan.java new file mode 100644 index 0000000..5adff38 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBPushFilterIntoScan.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.maprdb.binary; + +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.planner.logical.DrillOptiq; +import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.FilterPrel; +import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.planner.physical.ProjectPrel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.hbase.HBaseScanSpec; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rex.RexNode; + +import com.google.common.collect.ImmutableList; + +public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class); + + private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan:Filter_On_Scan") { + + @Override + public void onMatch(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(1); + final FilterPrel filter = (FilterPrel) call.rel(0); + final RexNode condition = filter.getCondition(); + + BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan(); + if (groupScan.isFilterPushedDown()) { + /* + * The rule can get triggered again due to the transformed "scan => filter" sequence + * created by the earlier execution of this rule when we could not do a complete + * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon + * this flag to not do a re-processing of the rule on the already transformed call. + */ + return; + } + + doPushFilterToScan(call, filter, null, scan, groupScan, condition); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(1); + if (scan.getGroupScan() instanceof BinaryTableGroupScan) { + return super.matches(call); + } + return false; + } + }; + + public static final StoragePluginOptimizerRule FILTER_ON_PROJECT = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))), "MapRDBPushFilterIntoScan:Filter_On_Project") { + + @Override + public void onMatch(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(2); + final ProjectPrel project = (ProjectPrel) call.rel(1); + final FilterPrel filter = (FilterPrel) call.rel(0); + + BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan(); + if (groupScan.isFilterPushedDown()) { + /* + * The rule can get triggered again due to the transformed "scan => filter" sequence + * created by the earlier execution of this rule when we could not do a complete + * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon + * this flag to not do a re-processing of the rule on the already transformed call. + */ + return; + } + + // convert the filter to one that references the child of the project + final RexNode condition = RelOptUtil.pushFilterPastProject(filter.getCondition(), project); + + doPushFilterToScan(call, filter, project, scan, groupScan, condition); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(2); + if (scan.getGroupScan() instanceof BinaryTableGroupScan) { + return super.matches(call); + } + return false; + } + }; + + protected void doPushFilterToScan(final RelOptRuleCall call, final FilterPrel filter, final ProjectPrel project, final ScanPrel scan, final BinaryTableGroupScan groupScan, final RexNode condition) { + + final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition); + final MapRDBFilterBuilder maprdbFilterBuilder = new MapRDBFilterBuilder(groupScan, conditionExp); + final HBaseScanSpec newScanSpec = maprdbFilterBuilder.parseTree(); + if (newScanSpec == null) { + return; //no filter pushdown ==> No transformation. + } + + final BinaryTableGroupScan newGroupsScan = new BinaryTableGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(), + groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns()); + newGroupsScan.setFilterPushedDown(true); + + final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType()); + + // Depending on whether is a project in the middle, assign either scan or copy of project to childRel. + final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));; + + if (maprdbFilterBuilder.isAllExpressionsConverted()) { + /* + * Since we could convert the entire filter condition expression into an HBase filter, + * we can eliminate the filter operator altogether. + */ + call.transformTo(childRel); + } else { + call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel))); + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonTableGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonTableGroupScan.java new file mode 100644 index 0000000..e798c52 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonTableGroupScan.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.maprdb.json; + +import static org.apache.drill.exec.store.maprdb.util.CommonFns.isNullOrEmpty; + +import java.io.IOException; +import java.util.List; +import java.util.TreeMap; + +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.maprdb.MapRDBFormatPlugin; +import org.apache.drill.exec.store.maprdb.MapRDBFormatPluginConfig; +import org.apache.drill.exec.store.maprdb.MapRDBGroupScan; +import org.apache.drill.exec.store.maprdb.MapRDBSubScan; +import org.apache.drill.exec.store.maprdb.MapRDBSubScanSpec; +import org.apache.drill.exec.store.maprdb.MapRDBTableStats; +import org.apache.drill.exec.store.maprdb.TabletFragmentInfo; +import org.apache.hadoop.conf.Configuration; +import org.codehaus.jackson.annotate.JsonCreator; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.mapr.db.MapRDB; +import com.mapr.db.Table; +import com.mapr.db.TabletInfo; +import com.mapr.db.impl.TabletInfoImpl; + +@JsonTypeName("maprdb-json-scan") +public class JsonTableGroupScan extends MapRDBGroupScan { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonTableGroupScan.class); + + public static final String TABLE_JSON = "json"; + + private MapRDBTableStats tableStats; + + private MapRDBSubScanSpec subscanSpec; + + @JsonCreator + public JsonTableGroupScan(@JsonProperty("userName") final String userName, + @JsonProperty("subscanSpec") MapRDBSubScanSpec subscanSpec, + @JsonProperty("storage") FileSystemConfig storagePluginConfig, + @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig, + @JsonProperty("columns") List columns, + @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { + this (userName, + (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig), + (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig), + subscanSpec, columns); + } + + public JsonTableGroupScan(String userName, FileSystemPlugin storagePlugin, + MapRDBFormatPlugin formatPlugin, MapRDBSubScanSpec subscanSpec, List columns) { + super(storagePlugin, formatPlugin, columns, userName); + this.subscanSpec = subscanSpec; + init(); + } + + /** + * Private constructor, used for cloning. + * @param that The HBaseGroupScan to clone + */ + private JsonTableGroupScan(JsonTableGroupScan that) { + super(that); + this.subscanSpec = that.subscanSpec; + this.endpointFragmentMapping = that.endpointFragmentMapping; + this.tableStats = that.tableStats; + } + + @Override + public GroupScan clone(List columns) { + JsonTableGroupScan newScan = new JsonTableGroupScan(this); + newScan.columns = columns; + return newScan; + } + + private void init() { + logger.debug("Getting tablet locations"); + try { + Configuration conf = new Configuration(); + Table t = MapRDB.getTable(subscanSpec.getTableName()); + TabletInfo[] tabletInfos = t.getTabletInfos(); + tableStats = new MapRDBTableStats(conf, subscanSpec.getTableName()); + + boolean foundStartRegion = false; + regionsToScan = new TreeMap(); + for (TabletInfo tabletInfo : tabletInfos) { + TabletInfoImpl tabletInfoImpl = (TabletInfoImpl) tabletInfo; + if (!foundStartRegion + && !isNullOrEmpty(subscanSpec.getStartRow()) + && !tabletInfoImpl.containsRow(subscanSpec.getStartRow())) { + continue; + } + foundStartRegion = true; + regionsToScan.put(new TabletFragmentInfo(tabletInfoImpl), tabletInfo.getLocations()[0]); + if (!isNullOrEmpty(subscanSpec.getStopRow()) + && tabletInfoImpl.containsRow(subscanSpec.getStopRow())) { + break; + } + } + } catch (Exception e) { + throw new DrillRuntimeException("Error getting region info for table: " + subscanSpec.getTableName(), e); + } + } + + protected MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) { + MapRDBSubScanSpec spec = subscanSpec; + MapRDBSubScanSpec subScanSpec = new MapRDBSubScanSpec( + spec.getTableName(), + regionsToScan.get(tfi), + (!isNullOrEmpty(spec.getStartRow()) && tfi.containsRow(spec.getStartRow())) ? spec.getStartRow() : tfi.getStartKey(), + (!isNullOrEmpty(spec.getStopRow()) && tfi.containsRow(spec.getStopRow())) ? spec.getStopRow() : tfi.getEndKey(), + spec.getSerializedFilter(), + null); + return subScanSpec; + } + + @Override + public MapRDBSubScan getSpecificScan(int minorFragmentId) { + assert minorFragmentId < endpointFragmentMapping.size() : String.format( + "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(), + minorFragmentId); + return new MapRDBSubScan(getUserName(), getStoragePlugin(), getStoragePlugin().getConfig(), + endpointFragmentMapping.get(minorFragmentId), columns, TABLE_JSON); + } + + @Override + public ScanStats getScanStats() { + //TODO: look at stats for this. + long rowCount = (long) ((subscanSpec.getSerializedFilter() != null ? .5 : 1) * tableStats.getNumRows()); + int avgColumnSize = 10; + int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size(); + return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount); + } + + @Override + @JsonIgnore + public PhysicalOperator getNewWithChildren(List children) { + Preconditions.checkArgument(children.isEmpty()); + return new JsonTableGroupScan(this); + } + + @JsonIgnore + public String getTableName() { + return subscanSpec.getTableName(); + } + + @Override + public String toString() { + return "JsonTableGroupScan [ScanSpec=" + + subscanSpec + ", columns=" + + columns + "]"; + } + + public MapRDBSubScanSpec getSubscanSpec() { + return subscanSpec; + } + +}