drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [15/50] [abbrv] drill git commit: Adding support for Json tables.
Date Tue, 13 Sep 2016 01:32:02 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java
index 57ef0b6..39e45f5 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java
@@ -17,14 +17,9 @@
  */
 package org.apache.drill.exec.store.maprdb;
 
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
+import java.util.Iterator;
+import java.util.List;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -34,16 +29,14 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.hbase.HBaseSubScan;
-import org.apache.drill.exec.store.hbase.HBaseUtils;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.ParseFilter;
-import org.apache.hadoop.hbase.util.Bytes;
 
-import java.nio.charset.CharacterCodingException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
 
 // Class containing information for reading a single HBase region
 @JsonTypeName("maprdb-sub-scan")
@@ -54,32 +47,36 @@ public class MapRDBSubScan extends AbstractBase implements SubScan {
   public final StoragePluginConfig storage;
   @JsonIgnore
   private final FileSystemPlugin fsStoragePlugin;
-  private final List<HBaseSubScan.HBaseSubScanSpec> regionScanSpecList;
+  private final List<MapRDBSubScanSpec> regionScanSpecList;
   private final List<SchemaPath> columns;
+  private final String tableType;
 
   @JsonCreator
   public MapRDBSubScan(@JacksonInject StoragePluginRegistry registry,
-                      @JsonProperty("userName") String userName,
-                      @JsonProperty("storage") StoragePluginConfig storage,
-                      @JsonProperty("regionScanSpecList") LinkedList<HBaseSubScan.HBaseSubScanSpec> regionScanSpecList,
-                      @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
+                       @JsonProperty("userName") String userName,
+                       @JsonProperty("storage") StoragePluginConfig storage,
+                       @JsonProperty("regionScanSpecList") List<MapRDBSubScanSpec> regionScanSpecList,
+                       @JsonProperty("columns") List<SchemaPath> columns,
+                       @JsonProperty("tableType") String tableType) throws ExecutionSetupException {
     super(userName);
     this.fsStoragePlugin = (FileSystemPlugin) registry.getPlugin(storage);
     this.regionScanSpecList = regionScanSpecList;
     this.storage = storage;
     this.columns = columns;
+    this.tableType = tableType;
   }
 
-    public MapRDBSubScan(String userName, FileSystemPlugin storagePlugin, StoragePluginConfig config,
-                         List<HBaseSubScan.HBaseSubScanSpec> hBaseSubScanSpecs, List<SchemaPath> columns) {
-        super(userName);
-        fsStoragePlugin = storagePlugin;
-        storage = config;
-        this.regionScanSpecList = hBaseSubScanSpecs;
-        this.columns = columns;
-    }
+  public MapRDBSubScan(String userName, FileSystemPlugin storagePlugin, StoragePluginConfig config,
+      List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType) {
+    super(userName);
+    fsStoragePlugin = storagePlugin;
+    storage = config;
+    this.regionScanSpecList = maprSubScanSpecs;
+    this.columns = columns;
+    this.tableType = tableType;
+  }
 
-    public List<HBaseSubScan.HBaseSubScanSpec> getRegionScanSpecList() {
+  public List<MapRDBSubScanSpec> getRegionScanSpecList() {
     return regionScanSpecList;
   }
 
@@ -100,7 +97,7 @@ public class MapRDBSubScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new MapRDBSubScan(getUserName(), fsStoragePlugin, storage, regionScanSpecList, columns);
+    return new MapRDBSubScan(getUserName(), fsStoragePlugin, storage, regionScanSpecList, columns, tableType);
   }
 
   @Override
@@ -108,121 +105,13 @@ public class MapRDBSubScan extends AbstractBase implements SubScan {
     return Iterators.emptyIterator();
   }
 
-  public static class HBaseSubScanSpec {
-
-    protected String tableName;
-    protected String regionServer;
-    protected byte[] startRow;
-    protected byte[] stopRow;
-    protected byte[] serializedFilter;
-
-    @JsonCreator
-    public HBaseSubScanSpec(@JsonProperty("tableName") String tableName,
-                            @JsonProperty("regionServer") String regionServer,
-                            @JsonProperty("startRow") byte[] startRow,
-                            @JsonProperty("stopRow") byte[] stopRow,
-                            @JsonProperty("serializedFilter") byte[] serializedFilter,
-                            @JsonProperty("filterString") String filterString) {
-      if (serializedFilter != null && filterString != null) {
-        throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time.");
-      }
-      this.tableName = tableName;
-      this.regionServer = regionServer;
-      this.startRow = startRow;
-      this.stopRow = stopRow;
-      if (serializedFilter != null) {
-        this.serializedFilter = serializedFilter;
-      } else {
-        this.serializedFilter = HBaseUtils.serializeFilter(parseFilterString(filterString));
-      }
-    }
-
-    static final ParseFilter PARSE_FILTER = new ParseFilter();
-
-    static Filter parseFilterString(String filterString) {
-      if (filterString == null) {
-        return null;
-      }
-      try {
-        return PARSE_FILTER.parseFilterString(filterString);
-      } catch (CharacterCodingException e) {
-        throw new DrillRuntimeException("Error parsing filter string: " + filterString, e);
-      }
-    }
-
-    /* package */ HBaseSubScanSpec() {
-      // empty constructor, to be used with builder pattern;
-    }
-
-    @JsonIgnore
-    private Filter scanFilter;
-    public Filter getScanFilter() {
-      if (scanFilter == null &&  serializedFilter != null) {
-          scanFilter = HBaseUtils.deserializeFilter(serializedFilter);
-      }
-      return scanFilter;
-    }
-
-    public String getTableName() {
-      return tableName;
-    }
-
-    public HBaseSubScanSpec setTableName(String tableName) {
-      this.tableName = tableName;
-      return this;
-    }
-
-    public String getRegionServer() {
-      return regionServer;
-    }
-
-    public HBaseSubScanSpec setRegionServer(String regionServer) {
-      this.regionServer = regionServer;
-      return this;
-    }
-
-    public byte[] getStartRow() {
-      return startRow;
-    }
-
-    public HBaseSubScanSpec setStartRow(byte[] startRow) {
-      this.startRow = startRow;
-      return this;
-    }
-
-    public byte[] getStopRow() {
-      return stopRow;
-    }
-
-    public HBaseSubScanSpec setStopRow(byte[] stopRow) {
-      this.stopRow = stopRow;
-      return this;
-    }
-
-    public byte[] getSerializedFilter() {
-      return serializedFilter;
-    }
-
-    public HBaseSubScanSpec setSerializedFilter(byte[] serializedFilter) {
-      this.serializedFilter = serializedFilter;
-      this.scanFilter = null;
-      return this;
-    }
-
-    @Override
-    public String toString() {
-      return "HBaseScanSpec [tableName=" + tableName
-          + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow))
-          + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow))
-          + ", filter=" + (getScanFilter() == null ? null : getScanFilter().toString())
-          + ", regionServer=" + regionServer + "]";
-    }
-
-  }
-
   @Override
   public int getOperatorType() {
     return 1001;
   }
 
+  public String getTableType() {
+    return tableType;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java
new file mode 100644
index 0000000..5e8d84c
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.maprdb;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.mapr.fs.jni.MapRConstants;
+import com.mapr.org.apache.hadoop.hbase.util.Bytes;
+
+public class MapRDBSubScanSpec {
+
+  protected String tableName;
+  protected String regionServer;
+  protected byte[] startRow;
+  protected byte[] stopRow;
+  protected byte[] serializedFilter;
+
+  @parquet.org.codehaus.jackson.annotate.JsonCreator
+  public MapRDBSubScanSpec(@JsonProperty("tableName") String tableName,
+                           @JsonProperty("regionServer") String regionServer,
+                           @JsonProperty("startRow") byte[] startRow,
+                           @JsonProperty("stopRow") byte[] stopRow,
+                           @JsonProperty("serializedFilter") byte[] serializedFilter,
+                           @JsonProperty("filterString") String filterString) {
+    if (serializedFilter != null && filterString != null) {
+      throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time.");
+    }
+    this.tableName = tableName;
+    this.regionServer = regionServer;
+    this.startRow = startRow;
+    this.stopRow = stopRow;
+    this.serializedFilter = serializedFilter;
+  }
+
+  /* package */ MapRDBSubScanSpec() {
+    // empty constructor, to be used with builder pattern;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public MapRDBSubScanSpec setTableName(String tableName) {
+    this.tableName = tableName;
+    return this;
+  }
+
+  public String getRegionServer() {
+    return regionServer;
+  }
+
+  public MapRDBSubScanSpec setRegionServer(String regionServer) {
+    this.regionServer = regionServer;
+    return this;
+  }
+
+  /**
+   * @return the raw (not-encoded) start row key for this sub-scan
+   */
+  public byte[] getStartRow() {
+    return startRow == null ? MapRConstants.EMPTY_BYTE_ARRAY: startRow;
+  }
+
+  public MapRDBSubScanSpec setStartRow(byte[] startRow) {
+    this.startRow = startRow;
+    return this;
+  }
+
+  /**
+   * @return the raw (not-encoded) stop row key for this sub-scan
+   */
+  public byte[] getStopRow() {
+    return stopRow == null ? MapRConstants.EMPTY_BYTE_ARRAY : stopRow;
+  }
+
+  public MapRDBSubScanSpec setStopRow(byte[] stopRow) {
+    this.stopRow = stopRow;
+    return this;
+  }
+
+  public byte[] getSerializedFilter() {
+    return serializedFilter;
+  }
+
+  public MapRDBSubScanSpec setSerializedFilter(byte[] serializedFilter) {
+    this.serializedFilter = serializedFilter;
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return "MapRDBSubScanSpec [tableName=" + tableName
+        + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow))
+        + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow))
+        + ", filter=" + (getSerializedFilter() == null ? null : Bytes.toBase64(getSerializedFilter()))
+        + ", regionServer=" + regionServer + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java
index 3d5320e..d2b1453 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java
@@ -18,26 +18,25 @@
 package org.apache.drill.exec.store.maprdb;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.mapr.TableMappingRulesFactory;
 
 import com.mapr.fs.hbase.HBaseAdminImpl;
 
 public class MapRDBTableStats {
+  private static volatile HBaseAdminImpl admin = null;
 
   private long numRows;
-  private volatile HBaseAdminImpl admin = null;
 
-  public MapRDBTableStats(HTable table) throws Exception {
+  public MapRDBTableStats(Configuration conf, String tablePath) throws Exception {
     if (admin == null) {
       synchronized (MapRDBTableStats.class) {
         if (admin == null) {
-          Configuration config = table.getConfiguration();
-          admin = new HBaseAdminImpl(config, TableMappingRulesFactory.create(config));
+          Configuration config = conf;
+          admin = new HBaseAdminImpl(config, TableMappingRulesFactory.create(conf));
         }
       }
     }
-    numRows = admin.getNumRows(new String(table.getTableName()));
+    numRows = admin.getNumRows(tablePath);
   }
 
   public long getNumRows() {

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java
new file mode 100644
index 0000000..389f00d
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.maprdb;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+
+import com.mapr.db.impl.TabletInfoImpl;
+
+public class TabletFragmentInfo  implements Comparable<TabletFragmentInfo> {
+
+  final private HRegionInfo regionInfo;
+  final private TabletInfoImpl tabletInfoImpl;
+
+  public TabletFragmentInfo(HRegionInfo regionInfo) {
+    this(null, regionInfo);
+  }
+
+  public TabletFragmentInfo(TabletInfoImpl tabletInfoImpl) {
+    this(tabletInfoImpl, null);
+  }
+
+  TabletFragmentInfo(TabletInfoImpl tabletInfoImpl, HRegionInfo regionInfo) {
+    this.regionInfo = regionInfo;
+    this.tabletInfoImpl = tabletInfoImpl;
+  }
+
+  public HRegionInfo getRegionInfo() {
+    return regionInfo;
+  }
+
+  public TabletInfoImpl getTabletInfoImpl() {
+    return tabletInfoImpl;
+  }
+
+  public boolean containsRow(byte[] row) {
+    return tabletInfoImpl != null ? tabletInfoImpl.containsRow(row) :
+        regionInfo.containsRow(row);
+  }
+
+  public byte[] getStartKey() {
+    return tabletInfoImpl != null ? tabletInfoImpl.getStartRow() :
+        regionInfo.getStartKey();
+  }
+
+  public byte[] getEndKey() {
+    return tabletInfoImpl != null ? tabletInfoImpl.getStopRow() :
+        regionInfo.getEndKey();
+  }
+
+  @Override
+  public int compareTo(TabletFragmentInfo o) {
+    return tabletInfoImpl != null ? tabletInfoImpl.compareTo(o.tabletInfoImpl) :
+        regionInfo.compareTo(o.regionInfo);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((regionInfo == null) ? 0 : regionInfo.hashCode());
+    result = prime * result + ((tabletInfoImpl == null) ? 0 : tabletInfoImpl.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    TabletFragmentInfo other = (TabletFragmentInfo) obj;
+    if (regionInfo == null) {
+      if (other.regionInfo != null)
+        return false;
+    } else if (!regionInfo.equals(other.regionInfo))
+      return false;
+    if (tabletInfoImpl == null) {
+      if (other.tabletInfoImpl != null)
+        return false;
+    } else if (!tabletInfoImpl.equals(other.tabletInfoImpl))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "TabletFragmentInfo [regionInfo=" + regionInfo + ", tabletInfoImpl=" + tabletInfoImpl
+        + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/BinaryTableGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/BinaryTableGroupScan.java
new file mode 100644
index 0000000..69fda9c
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/BinaryTableGroupScan.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.maprdb.binary;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
+import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.drill.exec.store.maprdb.MapRDBFormatPlugin;
+import org.apache.drill.exec.store.maprdb.MapRDBFormatPluginConfig;
+import org.apache.drill.exec.store.maprdb.MapRDBGroupScan;
+import org.apache.drill.exec.store.maprdb.MapRDBSubScan;
+import org.apache.drill.exec.store.maprdb.MapRDBSubScanSpec;
+import org.apache.drill.exec.store.maprdb.MapRDBTableStats;
+import org.apache.drill.exec.store.maprdb.TabletFragmentInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.codehaus.jackson.annotate.JsonCreator;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("maprdb-binary-scan")
+public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseConstants {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BinaryTableGroupScan.class);
+
+  public static final String TABLE_BINARY = "binary";
+
+  private HBaseScanSpec hbaseScanSpec;
+
+  private HTableDescriptor hTableDesc;
+
+  private MapRDBTableStats tableStats;
+
+  @JsonCreator
+  public BinaryTableGroupScan(@JsonProperty("userName") final String userName,
+                              @JsonProperty("hbaseScanSpec") HBaseScanSpec scanSpec,
+                              @JsonProperty("storage") FileSystemConfig storagePluginConfig,
+                              @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig,
+                              @JsonProperty("columns") List<SchemaPath> columns,
+                              @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
+    this (userName,
+          (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig),
+          (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
+          scanSpec, columns);
+  }
+
+  public BinaryTableGroupScan(String userName, FileSystemPlugin storagePlugin,
+      MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
+    super(storagePlugin, formatPlugin, columns, userName);
+    this.hbaseScanSpec = scanSpec;
+    init();
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   * @param that The HBaseGroupScan to clone
+   */
+  private BinaryTableGroupScan(BinaryTableGroupScan that) {
+    super(that);
+    this.hbaseScanSpec = that.hbaseScanSpec;
+    this.endpointFragmentMapping = that.endpointFragmentMapping;
+    this.hTableDesc = that.hTableDesc;
+    this.tableStats = that.tableStats;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    BinaryTableGroupScan newScan = new BinaryTableGroupScan(this);
+    newScan.columns = columns;
+    newScan.verifyColumns();
+    return newScan;
+  }
+
+  private void init() {
+    logger.debug("Getting region locations");
+    try {
+      Configuration conf = HBaseConfiguration.create();
+      HTable table = new HTable(conf, hbaseScanSpec.getTableName());
+      tableStats = new MapRDBTableStats(conf, hbaseScanSpec.getTableName());
+      this.hTableDesc = table.getTableDescriptor();
+      NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations();
+      table.close();
+
+      boolean foundStartRegion = false;
+      regionsToScan = new TreeMap<TabletFragmentInfo, String>();
+      for (Entry<HRegionInfo, ServerName> mapEntry : regionsMap.entrySet()) {
+        HRegionInfo regionInfo = mapEntry.getKey();
+        if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) {
+          continue;
+        }
+        foundStartRegion = true;
+        regionsToScan.put(new TabletFragmentInfo(regionInfo), mapEntry.getValue().getHostname());
+        if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) {
+          break;
+        }
+      }
+    } catch (Exception e) {
+      throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e);
+    }
+    verifyColumns();
+  }
+
+  private void verifyColumns() {
+    /*
+    if (columns != null) {
+      for (SchemaPath column : columns) {
+        if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
+          DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .",
+              column.getRootSegment().getPath(), hTableDesc.getNameAsString());
+        }
+      }
+    }
+    */
+  }
+
+  protected MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
+    HBaseScanSpec spec = hbaseScanSpec;
+    MapRDBSubScanSpec subScanSpec = new MapRDBSubScanSpec(
+        spec.getTableName(),
+        regionsToScan.get(tfi),
+        (!isNullOrEmpty(spec.getStartRow()) && tfi.containsRow(spec.getStartRow())) ? spec.getStartRow() : tfi.getStartKey(),
+        (!isNullOrEmpty(spec.getStopRow()) && tfi.containsRow(spec.getStopRow())) ? spec.getStopRow() : tfi.getEndKey(),
+        spec.getSerializedFilter(),
+        null);
+    return subScanSpec;
+  }
+
+  private boolean isNullOrEmpty(byte[] key) {
+    return key == null || key.length == 0;
+  }
+
+  @Override
+  public MapRDBSubScan getSpecificScan(int minorFragmentId) {
+    assert minorFragmentId < endpointFragmentMapping.size() : String.format(
+        "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
+        minorFragmentId);
+    return new MapRDBSubScan(getUserName(), getStoragePlugin(), getStoragePlugin().getConfig(),
+        endpointFragmentMapping.get(minorFragmentId), columns, TABLE_BINARY);
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    //TODO: look at stats for this.
+    long rowCount = (long) ((hbaseScanSpec.getFilter() != null ? .5 : 1) * tableStats.getNumRows());
+    int avgColumnSize = 10;
+    int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size();
+    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount);
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new BinaryTableGroupScan(this);
+  }
+
+  @JsonIgnore
+  public Configuration getHBaseConf() {
+    return HBaseConfiguration.create();
+  }
+
+  @JsonIgnore
+  public String getTableName() {
+    return getHBaseScanSpec().getTableName();
+  }
+
+  @Override
+  public String toString() {
+    return "BinaryTableGroupScan [ScanSpec="
+        + hbaseScanSpec + ", columns="
+        + columns + "]";
+  }
+
+  @JsonProperty
+  public HBaseScanSpec getHBaseScanSpec() {
+    return hbaseScanSpec;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/CompareFunctionsProcessor.java
new file mode 100644
index 0000000..f06786d
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/CompareFunctionsProcessor.java
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.maprdb.binary;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.ConvertExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.hadoop.hbase.util.Order;
+import org.apache.hadoop.hbase.util.PositionedByteRange;
+import org.apache.hadoop.hbase.util.SimplePositionedByteRange;
+
+import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
+  private byte[] value;
+  private boolean success;
+  private boolean isEqualityFn;
+  private SchemaPath path;
+  private String functionName;
+  private boolean sortOrderAscending;
+
+  // Fields for row-key prefix comparison
+  // If the query is on row-key prefix, we cannot use a standard template to identify startRow, stopRow and filter
+  // Hence, we use these local variables(set depending upon the encoding type in user query)
+  private boolean isRowKeyPrefixComparison;
+  byte[] rowKeyPrefixStartRow;
+  byte[] rowKeyPrefixStopRow;
+  Filter rowKeyPrefixFilter;
+
+  public static boolean isCompareFunction(String functionName) {
+    return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+  }
+
+  public static CompareFunctionsProcessor process(FunctionCall call, boolean nullComparatorSupported) {
+    String functionName = call.getName();
+    LogicalExpression nameArg = call.args.get(0);
+    LogicalExpression valueArg = call.args.size() >= 2 ? call.args.get(1) : null;
+    CompareFunctionsProcessor evaluator = new CompareFunctionsProcessor(functionName);
+
+    if (valueArg != null) { // binary function
+      if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+        LogicalExpression swapArg = valueArg;
+        valueArg = nameArg;
+        nameArg = swapArg;
+        evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+      }
+      evaluator.success = nameArg.accept(evaluator, valueArg);
+    } else if (nullComparatorSupported && call.args.get(0) instanceof SchemaPath) {
+      evaluator.success = true;
+      evaluator.path = (SchemaPath) nameArg;
+    }
+
+    return evaluator;
+  }
+
+  public CompareFunctionsProcessor(String functionName) {
+    this.success = false;
+    this.functionName = functionName;
+    this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)
+        && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(functionName);
+    this.isRowKeyPrefixComparison = false;
+    this.sortOrderAscending = true;
+  }
+
+  public byte[] getValue() {
+    return value;
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+
+  public SchemaPath getPath() {
+    return path;
+  }
+
+  public String getFunctionName() {
+    return functionName;
+  }
+
+  public boolean isRowKeyPrefixComparison() {
+	return isRowKeyPrefixComparison;
+  }
+
+  public byte[] getRowKeyPrefixStartRow() {
+    return rowKeyPrefixStartRow;
+  }
+
+  public byte[] getRowKeyPrefixStopRow() {
+  return rowKeyPrefixStopRow;
+  }
+
+  public Filter getRowKeyPrefixFilter() {
+  return rowKeyPrefixFilter;
+  }
+
+  public boolean isSortOrderAscending() {
+    return sortOrderAscending;
+  }
+
+  @Override
+  public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException {
+    if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) {
+      return e.getInput().accept(this, valueArg);
+    }
+    return false;
+  }
+
+  @Override
+  public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression valueArg) throws RuntimeException {
+    if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM) {
+
+      String encodingType = e.getEncodingType();
+      int prefixLength    = 0;
+
+      // Handle scan pruning in the following scenario:
+      // The row-key is a composite key and the CONVERT_FROM() function has byte_substr() as input function which is
+      // querying for the first few bytes of the row-key(start-offset 1)
+      // Example WHERE clause:
+      // CONVERT_FROM(BYTE_SUBSTR(row_key, 1, 8), 'DATE_EPOCH_BE') < DATE '2015-06-17'
+      if (e.getInput() instanceof FunctionCall) {
+
+        // We can prune scan range only for big-endian encoded data
+        if (encodingType.endsWith("_BE") == false) {
+          return false;
+        }
+
+        FunctionCall call = (FunctionCall)e.getInput();
+        String functionName = call.getName();
+        if (!functionName.equalsIgnoreCase("byte_substr")) {
+          return false;
+        }
+
+        LogicalExpression nameArg = call.args.get(0);
+        LogicalExpression valueArg1 = call.args.size() >= 2 ? call.args.get(1) : null;
+        LogicalExpression valueArg2 = call.args.size() >= 3 ? call.args.get(2) : null;
+
+        if (((nameArg instanceof SchemaPath) == false) ||
+             (valueArg1 == null) || ((valueArg1 instanceof IntExpression) == false) ||
+             (valueArg2 == null) || ((valueArg2 instanceof IntExpression) == false)) {
+          return false;
+        }
+
+        boolean isRowKey = ((SchemaPath)nameArg).getAsUnescapedPath().equals(DrillHBaseConstants.ROW_KEY);
+        int offset = ((IntExpression)valueArg1).getInt();
+
+        if (!isRowKey || (offset != 1)) {
+          return false;
+        }
+
+        this.path    = (SchemaPath)nameArg;
+        prefixLength = ((IntExpression)valueArg2).getInt();
+        this.isRowKeyPrefixComparison = true;
+        return visitRowKeyPrefixConvertExpression(e, prefixLength, valueArg);
+      }
+
+      if (e.getInput() instanceof SchemaPath) {
+        ByteBuf bb = null;
+
+        switch (encodingType) {
+        case "INT_BE":
+        case "INT":
+        case "UINT_BE":
+        case "UINT":
+        case "UINT4_BE":
+        case "UINT4":
+          if (valueArg instanceof IntExpression
+              && (isEqualityFn || encodingType.startsWith("U"))) {
+            bb = newByteBuf(4, encodingType.endsWith("_BE"));
+            bb.writeInt(((IntExpression)valueArg).getInt());
+          }
+          break;
+        case "BIGINT_BE":
+        case "BIGINT":
+        case "UINT8_BE":
+        case "UINT8":
+          if (valueArg instanceof LongExpression
+              && (isEqualityFn || encodingType.startsWith("U"))) {
+            bb = newByteBuf(8, encodingType.endsWith("_BE"));
+            bb.writeLong(((LongExpression)valueArg).getLong());
+          }
+          break;
+        case "FLOAT":
+          if (valueArg instanceof FloatExpression && isEqualityFn) {
+            bb = newByteBuf(4, true);
+            bb.writeFloat(((FloatExpression)valueArg).getFloat());
+          }
+          break;
+        case "DOUBLE":
+          if (valueArg instanceof DoubleExpression && isEqualityFn) {
+            bb = newByteBuf(8, true);
+            bb.writeDouble(((DoubleExpression)valueArg).getDouble());
+          }
+          break;
+        case "TIME_EPOCH":
+        case "TIME_EPOCH_BE":
+          if (valueArg instanceof TimeExpression) {
+            bb = newByteBuf(8, encodingType.endsWith("_BE"));
+            bb.writeLong(((TimeExpression)valueArg).getTime());
+          }
+          break;
+        case "DATE_EPOCH":
+        case "DATE_EPOCH_BE":
+          if (valueArg instanceof DateExpression) {
+            bb = newByteBuf(8, encodingType.endsWith("_BE"));
+            bb.writeLong(((DateExpression)valueArg).getDate());
+          }
+          break;
+        case "BOOLEAN_BYTE":
+          if (valueArg instanceof BooleanExpression) {
+            bb = newByteBuf(1, false /* does not matter */);
+            bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0);
+          }
+          break;
+        case "DOUBLE_OB":
+        case "DOUBLE_OBD":
+          if (valueArg instanceof DoubleExpression) {
+            bb = newByteBuf(9, true);
+            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
+            if (encodingType.endsWith("_OBD")) {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
+                  ((DoubleExpression)valueArg).getDouble(), Order.DESCENDING);
+              this.sortOrderAscending = false;
+            } else {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
+                  ((DoubleExpression)valueArg).getDouble(), Order.ASCENDING);
+            }
+          }
+          break;
+        case "FLOAT_OB":
+        case "FLOAT_OBD":
+          if (valueArg instanceof FloatExpression) {
+            bb = newByteBuf(5, true);
+            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
+            if (encodingType.endsWith("_OBD")) {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
+                  ((FloatExpression)valueArg).getFloat(), Order.DESCENDING);
+              this.sortOrderAscending = false;
+            } else {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
+                        ((FloatExpression)valueArg).getFloat(), Order.ASCENDING);
+            }
+          }
+          break;
+        case "BIGINT_OB":
+        case "BIGINT_OBD":
+          if (valueArg instanceof LongExpression) {
+            bb = newByteBuf(9, true);
+            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
+            if (encodingType.endsWith("_OBD")) {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
+                        ((LongExpression)valueArg).getLong(), Order.DESCENDING);
+              this.sortOrderAscending = false;
+            } else {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
+                  ((LongExpression)valueArg).getLong(), Order.ASCENDING);
+            }
+          }
+          break;
+        case "INT_OB":
+        case "INT_OBD":
+          if (valueArg instanceof IntExpression) {
+            bb = newByteBuf(5, true);
+            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
+            if (encodingType.endsWith("_OBD")) {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
+                  ((IntExpression)valueArg).getInt(), Order.DESCENDING);
+              this.sortOrderAscending = false;
+            } else {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
+                        ((IntExpression)valueArg).getInt(), Order.ASCENDING);
+            }
+          }
+          break;
+        case "UTF8_OB":
+        case "UTF8_OBD":
+          if (valueArg instanceof QuotedString) {
+            int stringLen = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8).length;
+            bb = newByteBuf(stringLen + 2, true);
+            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, stringLen + 2);
+            if (encodingType.endsWith("_OBD")) {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
+                  ((QuotedString)valueArg).value, Order.DESCENDING);
+              this.sortOrderAscending = false;
+            } else {
+              org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
+                        ((QuotedString)valueArg).value, Order.ASCENDING);
+            }
+          }
+          break;
+        case "UTF8":
+        // let visitSchemaPath() handle this.
+          return e.getInput().accept(this, valueArg);
+        }
+
+        if (bb != null) {
+          this.value = bb.array();
+          this.path = (SchemaPath)e.getInput();
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private Boolean visitRowKeyPrefixConvertExpression(ConvertExpression e,
+    int prefixLength, LogicalExpression valueArg) {
+    String encodingType = e.getEncodingType();
+    rowKeyPrefixStartRow = HConstants.EMPTY_START_ROW;
+    rowKeyPrefixStopRow  = HConstants.EMPTY_START_ROW;
+    rowKeyPrefixFilter   = null;
+
+    if ((encodingType.compareTo("UINT4_BE") == 0) ||
+        (encodingType.compareTo("UINT_BE") == 0)) {
+      if (prefixLength != 4) {
+        throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
+      }
+
+      int val;
+      if ((valueArg instanceof IntExpression) == false) {
+        return false;
+      }
+
+      val = ((IntExpression)valueArg).getInt();
+
+      // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >=
+      switch (functionName) {
+      case "equal":
+        rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(4).putInt(val).array());
+        rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array();
+        rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array();
+        return true;
+      case "greater_than_or_equal_to":
+        rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array();
+        return true;
+      case "greater_than":
+        rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val + 1).array();
+        return true;
+      case "less_than_or_equal_to":
+        rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array();
+        return true;
+      case "less_than":
+        rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val).array();
+        return true;
+      }
+
+      return false;
+    }
+
+    if ((encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) ||
+        (encodingType.compareTo("TIME_EPOCH_BE") == 0) ||
+        (encodingType.compareTo("UINT8_BE") == 0)) {
+
+      if (prefixLength != 8) {
+        throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
+      }
+
+      long val;
+      if (encodingType.compareTo("TIME_EPOCH_BE") == 0) {
+        if ((valueArg instanceof TimeExpression) == false) {
+          return false;
+        }
+
+        val = ((TimeExpression)valueArg).getTime();
+      } else if (encodingType.compareTo("UINT8_BE") == 0){
+        if ((valueArg instanceof LongExpression) == false) {
+          return false;
+        }
+
+        val = ((LongExpression)valueArg).getLong();
+      } else if (encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) {
+        if ((valueArg instanceof TimeStampExpression) == false) {
+          return false;
+        }
+
+        val = ((TimeStampExpression)valueArg).getTimeStamp();
+      } else {
+        // Should not reach here.
+        return false;
+      }
+
+      // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >=
+      switch (functionName) {
+      case "equal":
+        rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(8).putLong(val).array());
+        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array();
+        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array();
+        return true;
+      case "greater_than_or_equal_to":
+        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array();
+        return true;
+      case "greater_than":
+        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val + 1).array();
+        return true;
+      case "less_than_or_equal_to":
+        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array();
+        return true;
+      case "less_than":
+        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val).array();
+        return true;
+      }
+
+      return false;
+    }
+
+    if (encodingType.compareTo("DATE_EPOCH_BE") == 0) {
+      if ((valueArg instanceof DateExpression) == false) {
+        return false;
+      }
+
+      if (prefixLength != 8) {
+        throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
+      }
+
+      final long MILLISECONDS_IN_A_DAY  = (long)1000 * 60 * 60 * 24;
+      long dateToSet;
+      // For DATE encoding, the operators that we push-down are =, <>, <, <=, >, >=
+      switch (functionName) {
+      case "equal":
+        long startDate = ((DateExpression)valueArg).getDate();
+        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(startDate).array();
+        long stopDate  = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
+        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(stopDate).array();
+        return true;
+      case "greater_than_or_equal_to":
+        dateToSet = ((DateExpression)valueArg).getDate();
+        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
+        return true;
+      case "greater_than":
+        dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
+        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
+        return true;
+      case "less_than_or_equal_to":
+        dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
+        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
+        return true;
+      case "less_than":
+        dateToSet = ((DateExpression)valueArg).getDate();
+        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
+        return true;
+      }
+
+      return false;
+    }
+
+    return false;
+  }
+
+  @Override
+  public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
+    return false;
+  }
+
+  @Override
+  public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
+    if (valueArg instanceof QuotedString) {
+      this.value = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8);
+      this.path = path;
+      return true;
+    }
+    return false;
+  }
+
+  private static ByteBuf newByteBuf(int size, boolean bigEndian) {
+    return Unpooled.wrappedBuffer(new byte[size])
+        .order(bigEndian ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN)
+        .writerIndex(0);
+  }
+
+  private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
+  static {
+    ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
+    VALUE_EXPRESSION_CLASSES = builder
+        .add(BooleanExpression.class)
+        .add(DateExpression.class)
+        .add(DoubleExpression.class)
+        .add(FloatExpression.class)
+        .add(IntExpression.class)
+        .add(LongExpression.class)
+        .add(QuotedString.class)
+        .add(TimeExpression.class)
+        .build();
+  }
+
+  private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+  static {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+        // unary functions
+        .put("isnotnull", "isnotnull")
+        .put("isNotNull", "isNotNull")
+        .put("is not null", "is not null")
+        .put("isnull", "isnull")
+        .put("isNull", "isNull")
+        .put("is null", "is null")
+        // binary functions
+        .put("like", "like")
+        .put("equal", "equal")
+        .put("not_equal", "not_equal")
+        .put("greater_than_or_equal_to", "less_than_or_equal_to")
+        .put("greater_than", "less_than")
+        .put("less_than_or_equal_to", "greater_than_or_equal_to")
+        .put("less_than", "greater_than")
+        .build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBFilterBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBFilterBuilder.java
new file mode 100644
index 0000000..800d155
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBFilterBuilder.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.maprdb.binary;
+
+import java.util.Arrays;
+
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
+import org.apache.drill.exec.store.hbase.HBaseRegexParser;
+import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.drill.exec.store.hbase.HBaseUtils;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.NullComparator;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+
+public class MapRDBFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
+
+  final private BinaryTableGroupScan groupScan;
+
+  final private LogicalExpression le;
+
+  private boolean allExpressionsConverted = true;
+
+  private static Boolean nullComparatorSupported = null;
+
+  MapRDBFilterBuilder(BinaryTableGroupScan groupScan, LogicalExpression le) {
+    this.groupScan = groupScan;
+    this.le = le;
+  }
+
+  public HBaseScanSpec parseTree() {
+    HBaseScanSpec parsedSpec = le.accept(this, null);
+    if (parsedSpec != null) {
+      parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), parsedSpec);
+      /*
+       * If RowFilter is THE filter attached to the scan specification,
+       * remove it since its effect is also achieved through startRow and stopRow.
+       */
+      Filter filter = parsedSpec.getFilter();
+      if (filter instanceof RowFilter &&
+          ((RowFilter)filter).getOperator() != CompareOp.NOT_EQUAL &&
+          ((RowFilter)filter).getComparator() instanceof BinaryComparator) {
+        parsedSpec = new HBaseScanSpec(parsedSpec.getTableName(), parsedSpec.getStartRow(), parsedSpec.getStopRow(), null);
+      }
+    }
+    return parsedSpec;
+  }
+
+  public boolean isAllExpressionsConverted() {
+    return allExpressionsConverted;
+  }
+
+  @Override
+  public HBaseScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
+    allExpressionsConverted = false;
+    return null;
+  }
+
+  @Override
+  public HBaseScanSpec visitBooleanOperator(BooleanOperator op, Void value) throws RuntimeException {
+    return visitFunctionCall(op, value);
+  }
+
+  @Override
+  public HBaseScanSpec visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
+    HBaseScanSpec nodeScanSpec = null;
+    String functionName = call.getName();
+    ImmutableList<LogicalExpression> args = call.args;
+
+    if (CompareFunctionsProcessor.isCompareFunction(functionName)) {
+      /*
+       * HBASE-10848: Bug in HBase versions (0.94.[0-18], 0.96.[0-2], 0.98.[0-1])
+       * causes a filter with NullComparator to fail. Enable only if specified in
+       * the configuration (after ensuring that the HBase cluster has the fix).
+       */
+      if (nullComparatorSupported == null) {
+        nullComparatorSupported = groupScan.getHBaseConf().getBoolean("drill.hbase.supports.null.comparator", false);
+      }
+
+      CompareFunctionsProcessor processor = CompareFunctionsProcessor.process(call, nullComparatorSupported);
+      if (processor.isSuccess()) {
+        nodeScanSpec = createHBaseScanSpec(call, processor);
+      }
+    } else {
+      switch (functionName) {
+      case "booleanAnd":
+      case "booleanOr":
+        HBaseScanSpec firstScanSpec = args.get(0).accept(this, null);
+        for (int i = 1; i < args.size(); ++i) {
+          HBaseScanSpec nextScanSpec = args.get(i).accept(this, null);
+          if (firstScanSpec != null && nextScanSpec != null) {
+            nodeScanSpec = mergeScanSpecs(functionName, firstScanSpec, nextScanSpec);
+          } else {
+            allExpressionsConverted = false;
+            if ("booleanAnd".equals(functionName)) {
+              nodeScanSpec = firstScanSpec == null ? nextScanSpec : firstScanSpec;
+            }
+          }
+          firstScanSpec = nodeScanSpec;
+        }
+        break;
+      }
+    }
+
+    if (nodeScanSpec == null) {
+      allExpressionsConverted = false;
+    }
+
+    return nodeScanSpec;
+  }
+
+  private HBaseScanSpec mergeScanSpecs(String functionName, HBaseScanSpec leftScanSpec, HBaseScanSpec rightScanSpec) {
+    Filter newFilter = null;
+    byte[] startRow = HConstants.EMPTY_START_ROW;
+    byte[] stopRow = HConstants.EMPTY_END_ROW;
+
+    switch (functionName) {
+    case "booleanAnd":
+      newFilter = HBaseUtils.andFilterAtIndex(leftScanSpec.getFilter(), -1, rightScanSpec.getFilter()); //HBaseUtils.LAST_FILTER
+      startRow = HBaseUtils.maxOfStartRows(leftScanSpec.getStartRow(), rightScanSpec.getStartRow());
+      stopRow = HBaseUtils.minOfStopRows(leftScanSpec.getStopRow(), rightScanSpec.getStopRow());
+      break;
+    case "booleanOr":
+      newFilter = HBaseUtils.orFilterAtIndex(leftScanSpec.getFilter(), -1, rightScanSpec.getFilter()); //HBaseUtils.LAST_FILTER
+      startRow = HBaseUtils.minOfStartRows(leftScanSpec.getStartRow(), rightScanSpec.getStartRow());
+      stopRow = HBaseUtils.maxOfStopRows(leftScanSpec.getStopRow(), rightScanSpec.getStopRow());
+    }
+    return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, newFilter);
+  }
+
+  private HBaseScanSpec createHBaseScanSpec(FunctionCall call, CompareFunctionsProcessor processor) {
+    String functionName = processor.getFunctionName();
+    SchemaPath field = processor.getPath();
+    byte[] fieldValue = processor.getValue();
+    boolean sortOrderAscending = processor.isSortOrderAscending();
+    boolean isRowKey = field.getAsUnescapedPath().equals(ROW_KEY);
+    if (!(isRowKey
+        || (!field.getRootSegment().isLastPath()
+            && field.getRootSegment().getChild().isLastPath()
+            && field.getRootSegment().getChild().isNamed())
+           )
+        ) {
+      /*
+       * if the field in this function is neither the row_key nor a qualified HBase column, return.
+       */
+      return null;
+    }
+
+    if (processor.isRowKeyPrefixComparison()) {
+      return createRowKeyPrefixScanSpec(call, processor);
+    }
+
+    CompareOp compareOp = null;
+    boolean isNullTest = false;
+    ByteArrayComparable comparator = new BinaryComparator(fieldValue);
+    byte[] startRow = HConstants.EMPTY_START_ROW;
+    byte[] stopRow = HConstants.EMPTY_END_ROW;
+    switch (functionName) {
+    case "equal":
+      compareOp = CompareOp.EQUAL;
+      if (isRowKey) {
+        startRow = fieldValue;
+        /* stopRow should be just greater than 'value'*/
+        stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+        compareOp = CompareOp.EQUAL;
+      }
+      break;
+    case "not_equal":
+      compareOp = CompareOp.NOT_EQUAL;
+      break;
+    case "greater_than_or_equal_to":
+      if (sortOrderAscending) {
+        compareOp = CompareOp.GREATER_OR_EQUAL;
+        if (isRowKey) {
+          startRow = fieldValue;
+        }
+      } else {
+        compareOp = CompareOp.LESS_OR_EQUAL;
+        if (isRowKey) {
+          // stopRow should be just greater than 'value'
+          stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+        }
+      }
+      break;
+    case "greater_than":
+      if (sortOrderAscending) {
+        compareOp = CompareOp.GREATER;
+        if (isRowKey) {
+          // startRow should be just greater than 'value'
+          startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+        }
+      } else {
+        compareOp = CompareOp.LESS;
+        if (isRowKey) {
+          stopRow = fieldValue;
+        }
+      }
+      break;
+    case "less_than_or_equal_to":
+      if (sortOrderAscending) {
+        compareOp = CompareOp.LESS_OR_EQUAL;
+        if (isRowKey) {
+          // stopRow should be just greater than 'value'
+          stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+        }
+      } else {
+        compareOp = CompareOp.GREATER_OR_EQUAL;
+        if (isRowKey) {
+          startRow = fieldValue;
+        }
+      }
+      break;
+    case "less_than":
+      if (sortOrderAscending) {
+        compareOp = CompareOp.LESS;
+        if (isRowKey) {
+          stopRow = fieldValue;
+        }
+      } else {
+        compareOp = CompareOp.GREATER;
+        if (isRowKey) {
+          // startRow should be just greater than 'value'
+          startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
+        }
+      }
+      break;
+    case "isnull":
+    case "isNull":
+    case "is null":
+      if (isRowKey) {
+        return null;
+      }
+      isNullTest = true;
+      compareOp = CompareOp.EQUAL;
+      comparator = new NullComparator();
+      break;
+    case "isnotnull":
+    case "isNotNull":
+    case "is not null":
+      if (isRowKey) {
+        return null;
+      }
+      compareOp = CompareOp.NOT_EQUAL;
+      comparator = new NullComparator();
+      break;
+    case "like":
+      /*
+       * Convert the LIKE operand to Regular Expression pattern so that we can
+       * apply RegexStringComparator()
+       */
+      HBaseRegexParser parser = new HBaseRegexParser(call).parse();
+      compareOp = CompareOp.EQUAL;
+      comparator = new RegexStringComparator(parser.getRegexString());
+
+      /*
+       * We can possibly do better if the LIKE operator is on the row_key
+       */
+      if (isRowKey) {
+        String prefix = parser.getPrefixString();
+        if (prefix != null) { // group 3 is literal
+          /*
+           * If there is a literal prefix, it can help us prune the scan to a sub range
+           */
+          if (prefix.equals(parser.getLikeString())) {
+            /* The operand value is literal. This turns the LIKE operator to EQUAL operator */
+            startRow = stopRow = fieldValue;
+            compareOp = null;
+          } else {
+            startRow = prefix.getBytes(Charsets.UTF_8);
+            stopRow = startRow.clone();
+            boolean isMaxVal = true;
+            for (int i = stopRow.length - 1; i >= 0 ; --i) {
+              int nextByteValue = (0xff & stopRow[i]) + 1;
+              if (nextByteValue < 0xff) {
+                stopRow[i] = (byte) nextByteValue;
+                isMaxVal = false;
+                break;
+              } else {
+                stopRow[i] = 0;
+              }
+            }
+            if (isMaxVal) {
+              stopRow = HConstants.EMPTY_END_ROW;
+            }
+          }
+        }
+      }
+      break;
+    }
+
+    if (compareOp != null || startRow != HConstants.EMPTY_START_ROW || stopRow != HConstants.EMPTY_END_ROW) {
+      Filter filter = null;
+      if (isRowKey) {
+        if (compareOp != null) {
+          filter = new RowFilter(compareOp, comparator);
+        }
+      } else {
+        byte[] family = HBaseUtils.getBytes(field.getRootSegment().getPath());
+        byte[] qualifier = HBaseUtils.getBytes(field.getRootSegment().getChild().getNameSegment().getPath());
+        filter = new SingleColumnValueFilter(family, qualifier, compareOp, comparator);
+        ((SingleColumnValueFilter)filter).setLatestVersionOnly(true);
+        if (!isNullTest) {
+          ((SingleColumnValueFilter)filter).setFilterIfMissing(true);
+        }
+      }
+      return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter);
+    }
+    // else
+    return null;
+  }
+
+  private HBaseScanSpec createRowKeyPrefixScanSpec(FunctionCall call,
+      CompareFunctionsProcessor processor) {
+    byte[] startRow = processor.getRowKeyPrefixStartRow();
+    byte[] stopRow  = processor.getRowKeyPrefixStopRow();
+    Filter filter   = processor.getRowKeyPrefixFilter();
+
+    if (startRow != HConstants.EMPTY_START_ROW ||
+      stopRow != HConstants.EMPTY_END_ROW ||
+      filter != null) {
+      return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter);
+    }
+
+    // else
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBPushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBPushFilterIntoScan.java
new file mode 100644
index 0000000..5adff38
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/binary/MapRDBPushFilterIntoScan.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.maprdb.binary;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rex.RexNode;
+
+import com.google.common.collect.ImmutableList;
+
+public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class);
+
+  private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String description) {
+    super(operand, description);
+  }
+
+  public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan:Filter_On_Scan") {
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final ScanPrel scan = (ScanPrel) call.rel(1);
+      final FilterPrel filter = (FilterPrel) call.rel(0);
+      final RexNode condition = filter.getCondition();
+
+      BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan();
+      if (groupScan.isFilterPushedDown()) {
+        /*
+         * The rule can get triggered again due to the transformed "scan => filter" sequence
+         * created by the earlier execution of this rule when we could not do a complete
+         * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
+         * this flag to not do a re-processing of the rule on the already transformed call.
+         */
+        return;
+      }
+
+      doPushFilterToScan(call, filter, null, scan, groupScan, condition);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      final ScanPrel scan = (ScanPrel) call.rel(1);
+      if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
+        return super.matches(call);
+      }
+      return false;
+    }
+  };
+
+  public static final StoragePluginOptimizerRule FILTER_ON_PROJECT = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))), "MapRDBPushFilterIntoScan:Filter_On_Project") {
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final ScanPrel scan = (ScanPrel) call.rel(2);
+      final ProjectPrel project = (ProjectPrel) call.rel(1);
+      final FilterPrel filter = (FilterPrel) call.rel(0);
+
+      BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan();
+      if (groupScan.isFilterPushedDown()) {
+        /*
+         * The rule can get triggered again due to the transformed "scan => filter" sequence
+         * created by the earlier execution of this rule when we could not do a complete
+         * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
+         * this flag to not do a re-processing of the rule on the already transformed call.
+         */
+         return;
+      }
+
+      // convert the filter to one that references the child of the project
+      final RexNode condition =  RelOptUtil.pushFilterPastProject(filter.getCondition(), project);
+
+      doPushFilterToScan(call, filter, project, scan, groupScan, condition);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      final ScanPrel scan = (ScanPrel) call.rel(2);
+      if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
+        return super.matches(call);
+      }
+      return false;
+    }
+  };
+
+  protected void doPushFilterToScan(final RelOptRuleCall call, final FilterPrel filter, final ProjectPrel project, final ScanPrel scan, final BinaryTableGroupScan groupScan, final RexNode condition) {
+
+    final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
+    final MapRDBFilterBuilder maprdbFilterBuilder = new MapRDBFilterBuilder(groupScan, conditionExp);
+    final HBaseScanSpec newScanSpec = maprdbFilterBuilder.parseTree();
+    if (newScanSpec == null) {
+      return; //no filter pushdown ==> No transformation.
+    }
+
+    final BinaryTableGroupScan newGroupsScan = new BinaryTableGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
+                                                              groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns());
+    newGroupsScan.setFilterPushedDown(true);
+
+    final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
+
+    // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
+    final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
+
+    if (maprdbFilterBuilder.isAllExpressionsConverted()) {
+        /*
+         * Since we could convert the entire filter condition expression into an HBase filter,
+         * we can eliminate the filter operator altogether.
+         */
+      call.transformTo(childRel);
+    } else {
+      call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel)));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonTableGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonTableGroupScan.java
new file mode 100644
index 0000000..e798c52
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/JsonTableGroupScan.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.maprdb.json;
+
+import static org.apache.drill.exec.store.maprdb.util.CommonFns.isNullOrEmpty;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.maprdb.MapRDBFormatPlugin;
+import org.apache.drill.exec.store.maprdb.MapRDBFormatPluginConfig;
+import org.apache.drill.exec.store.maprdb.MapRDBGroupScan;
+import org.apache.drill.exec.store.maprdb.MapRDBSubScan;
+import org.apache.drill.exec.store.maprdb.MapRDBSubScanSpec;
+import org.apache.drill.exec.store.maprdb.MapRDBTableStats;
+import org.apache.drill.exec.store.maprdb.TabletFragmentInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.codehaus.jackson.annotate.JsonCreator;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.mapr.db.MapRDB;
+import com.mapr.db.Table;
+import com.mapr.db.TabletInfo;
+import com.mapr.db.impl.TabletInfoImpl;
+
+@JsonTypeName("maprdb-json-scan")
+public class JsonTableGroupScan extends MapRDBGroupScan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonTableGroupScan.class);
+
+  public static final String TABLE_JSON = "json";
+
+  private MapRDBTableStats tableStats;
+
+  private MapRDBSubScanSpec subscanSpec;
+
+  @JsonCreator
+  public JsonTableGroupScan(@JsonProperty("userName") final String userName,
+                            @JsonProperty("subscanSpec") MapRDBSubScanSpec subscanSpec,
+                            @JsonProperty("storage") FileSystemConfig storagePluginConfig,
+                            @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig,
+                            @JsonProperty("columns") List<SchemaPath> columns,
+                            @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
+    this (userName,
+          (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig),
+          (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
+          subscanSpec, columns);
+  }
+
+  public JsonTableGroupScan(String userName, FileSystemPlugin storagePlugin,
+      MapRDBFormatPlugin formatPlugin, MapRDBSubScanSpec subscanSpec, List<SchemaPath> columns) {
+    super(storagePlugin, formatPlugin, columns, userName);
+    this.subscanSpec = subscanSpec;
+    init();
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   * @param that The HBaseGroupScan to clone
+   */
+  private JsonTableGroupScan(JsonTableGroupScan that) {
+    super(that);
+    this.subscanSpec = that.subscanSpec;
+    this.endpointFragmentMapping = that.endpointFragmentMapping;
+    this.tableStats = that.tableStats;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    JsonTableGroupScan newScan = new JsonTableGroupScan(this);
+    newScan.columns = columns;
+    return newScan;
+  }
+
+  private void init() {
+    logger.debug("Getting tablet locations");
+    try {
+      Configuration conf = new Configuration();
+      Table t = MapRDB.getTable(subscanSpec.getTableName());
+      TabletInfo[] tabletInfos = t.getTabletInfos();
+      tableStats = new MapRDBTableStats(conf, subscanSpec.getTableName());
+
+      boolean foundStartRegion = false;
+      regionsToScan = new TreeMap<TabletFragmentInfo, String>();
+      for (TabletInfo tabletInfo : tabletInfos) {
+        TabletInfoImpl tabletInfoImpl = (TabletInfoImpl) tabletInfo;
+        if (!foundStartRegion 
+            && !isNullOrEmpty(subscanSpec.getStartRow())
+            && !tabletInfoImpl.containsRow(subscanSpec.getStartRow())) {
+          continue;
+        }
+        foundStartRegion = true;
+        regionsToScan.put(new TabletFragmentInfo(tabletInfoImpl), tabletInfo.getLocations()[0]);
+        if (!isNullOrEmpty(subscanSpec.getStopRow())
+            && tabletInfoImpl.containsRow(subscanSpec.getStopRow())) {
+          break;
+        }
+      }
+    } catch (Exception e) {
+      throw new DrillRuntimeException("Error getting region info for table: " + subscanSpec.getTableName(), e);
+    }
+  }
+
+  protected MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
+    MapRDBSubScanSpec spec = subscanSpec;
+    MapRDBSubScanSpec subScanSpec = new MapRDBSubScanSpec(
+        spec.getTableName(),
+        regionsToScan.get(tfi),
+        (!isNullOrEmpty(spec.getStartRow()) && tfi.containsRow(spec.getStartRow())) ? spec.getStartRow() : tfi.getStartKey(),
+        (!isNullOrEmpty(spec.getStopRow()) && tfi.containsRow(spec.getStopRow())) ? spec.getStopRow() : tfi.getEndKey(),
+        spec.getSerializedFilter(),
+        null);
+    return subScanSpec;
+  }
+
+  @Override
+  public MapRDBSubScan getSpecificScan(int minorFragmentId) {
+    assert minorFragmentId < endpointFragmentMapping.size() : String.format(
+        "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
+        minorFragmentId);
+    return new MapRDBSubScan(getUserName(), getStoragePlugin(), getStoragePlugin().getConfig(),
+        endpointFragmentMapping.get(minorFragmentId), columns, TABLE_JSON);
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    //TODO: look at stats for this.
+    long rowCount = (long) ((subscanSpec.getSerializedFilter() != null ? .5 : 1) * tableStats.getNumRows());
+    int avgColumnSize = 10;
+    int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size();
+    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount);
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new JsonTableGroupScan(this);
+  }
+
+  @JsonIgnore
+  public String getTableName() {
+    return subscanSpec.getTableName();
+  }
+
+  @Override
+  public String toString() {
+    return "JsonTableGroupScan [ScanSpec="
+        + subscanSpec + ", columns="
+        + columns + "]";
+  }
+
+  public MapRDBSubScanSpec getSubscanSpec() {
+    return subscanSpec;
+  }
+
+}


Mime
View raw message