drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [37/50] [abbrv] drill git commit: Refactoring code for better organization.
Date Tue, 13 Sep 2016 01:32:24 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
new file mode 100644
index 0000000..7fbcd1b
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import static org.ojai.DocumentConstants.ID_KEY;
+import io.netty.buffer.DrillBuf;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
+import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
+import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
+import org.ojai.DocumentReader;
+import org.ojai.DocumentReader.EventType;
+import org.ojai.DocumentStream;
+import org.ojai.FieldPath;
+import org.ojai.FieldSegment;
+import org.ojai.Value;
+import org.ojai.store.QueryCondition;
+import org.ojai.types.OTime;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.mapr.db.MapRDB;
+import com.mapr.db.Table;
+import com.mapr.db.Table.TableOption;
+import com.mapr.db.exceptions.DBException;
+import com.mapr.db.impl.IdCodec;
+import com.mapr.db.ojai.DBDocumentReaderBase;
+import com.mapr.db.util.ByteBufs;
+import com.mapr.org.apache.hadoop.hbase.util.Bytes;
+
+public class MaprDBJsonRecordReader extends AbstractRecordReader {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MaprDBJsonRecordReader.class);
+
+  public static final SchemaPath ID_PATH = SchemaPath.getSimplePath(ID_KEY);
+
+  private Table table;
+  private QueryCondition condition;
+  private FieldPath[] projectedFields;
+
+  private final String tableName;
+  private OperatorContext operatorContext;
+  private VectorContainerWriter writer;
+
+  private DrillBuf buffer;
+
+  private DocumentStream documentStream;
+
+  private Iterator<DocumentReader> documentReaderIterators;
+
+  private boolean includeId;
+  private boolean idOnly;
+  private final boolean unionEnabled;
+  private final boolean readNumbersAsDouble;
+  private final boolean allTextMode;
+  private final long MILLISECONDS_IN_A_DAY  = (long)1000 * 60 * 60 * 24;
+
+  public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
+      MapRDBFormatPluginConfig formatPluginConfig,
+      List<SchemaPath> projectedColumns, FragmentContext context) {
+    buffer = context.getManagedBuffer();
+    projectedFields = null;
+    tableName = Preconditions.checkNotNull(subScanSpec, "MapRDB reader needs a sub-scan spec").getTableName();
+    documentReaderIterators = null;
+    includeId = false;
+    idOnly    = false;
+    byte[] serializedFilter = subScanSpec.getSerializedFilter();
+    condition = null;
+
+    if (serializedFilter != null) {
+      condition = com.mapr.db.impl.ConditionImpl.parseFrom(ByteBufs.wrap(serializedFilter));
+    }
+
+    setColumns(projectedColumns);
+    unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+    readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble();
+    allTextMode = formatPluginConfig.isAllTextMode();
+  }
+
+  @Override
+  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) {
+    Set<SchemaPath> transformed = Sets.newLinkedHashSet();
+    if (!isStarQuery()) {
+      Set<FieldPath> projectedFieldsSet = Sets.newTreeSet();
+      for (SchemaPath column : columns) {
+        if (column.getRootSegment().getPath().equalsIgnoreCase(ID_KEY)) {
+          /*
+           * we do not include _id field in the set of projected fields
+           * because the DB currently can not return a document if only
+           * the _id field was projected. This should really be fixed in
+           * the DB client (Bug 21708) to avoid transferring the entire
+           * document when only _id is requested.
+           */
+          // projectedFieldsList.add(ID_FIELD);
+          includeId = true;
+        } else {
+          projectedFieldsSet.add(getFieldPathForProjection(column));
+        }
+        transformed.add(column);
+      }
+      if (projectedFieldsSet.size() > 0) {
+        projectedFields = projectedFieldsSet.toArray(new FieldPath[projectedFieldsSet.size()]);
+      }
+    } else {
+      transformed.add(AbstractRecordReader.STAR_COLUMN);
+      includeId = true;
+    }
+
+    /*
+     * (Bug 21708) if we are projecting only the id field, save that condition here.
+     */
+    idOnly = !isStarQuery() && (projectedFields == null);
+    return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    this.writer = new VectorContainerWriter(output, unionEnabled);
+    this.operatorContext = context;
+
+    try {
+      table = MapRDB.getTable(tableName);
+      table.setOption(TableOption.EXCLUDEID, !includeId);
+      documentStream = table.find(condition, projectedFields);
+      documentReaderIterators = documentStream.documentReaders().iterator();
+    } catch (DBException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public int next() {
+    Stopwatch watch = Stopwatch.createUnstarted();
+    watch.start();
+
+    writer.allocate();
+    writer.reset();
+
+    int recordCount = 0;
+    DBDocumentReaderBase reader = null;
+
+    while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) {
+      try {
+        reader = nextDocumentReader();
+        if (reader == null) break;
+        writer.setPosition(recordCount);
+        if (idOnly) {
+          Value id = reader.getId();
+          MapWriter map = writer.rootAsMap();
+
+          try {
+            switch(id.getType()) {
+            case STRING:
+              writeString(map.varChar(ID_KEY), id.getString());
+              recordCount++;
+              break;
+            case BINARY:
+              if (allTextMode) {
+                writeString(map.varChar(ID_KEY), new String(id.getBinary().array(), Charset.forName("UTF-8")));
+              } else {
+                writeBinary(map.varBinary(ID_KEY), id.getBinary());
+              }
+              recordCount++;
+              break;
+            default:
+              throw new UnsupportedOperationException(id.getType() +
+                  " is not a supported type for _id field.");
+            }
+          } catch (IllegalStateException | IllegalArgumentException e) {
+            logger.warn(String.format("Possible schema change at _id: '%s'",
+                IdCodec.asString(id)), e);
+          }
+        } else {
+          if (reader.next() != EventType.START_MAP) {
+            throw dataReadError("The document did not start with START_MAP!");
+          }
+          writeToMap(reader, writer.rootAsMap());
+          recordCount++;
+        }
+      } catch (UserException e) {
+        throw UserException.unsupportedError(e)
+            .addContext(String.format("Table: %s, document id: '%s'",
+                table.getPath(),
+                reader == null ? null : IdCodec.asString(reader.getId())))
+            .build(logger);
+      }
+    }
+
+    writer.setValueCount(recordCount);
+    logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), recordCount);
+    return recordCount;
+  }
+
+  private void writeToMap(DBDocumentReaderBase reader, MapWriter map) {
+    map.start();
+    outside: while (true) {
+      EventType event = reader.next();
+      if (event == null || event == EventType.END_MAP) break outside;
+
+      String fieldName = reader.getFieldName();
+      try {
+        switch (event) {
+        case NULL:
+          break; // not setting the field will leave it as null
+        case BINARY:
+          if (allTextMode) {
+            writeString(map.varChar(fieldName), new String(reader.getBinary().array(), Charset.forName("UTF-8")));
+          } else {
+            writeBinary(map.varBinary(fieldName), reader.getBinary());
+          }
+          break;
+        case BOOLEAN:
+          if (allTextMode) {
+            writeString(map.varChar(fieldName), String.valueOf(reader.getBoolean()));
+          } else {
+            map.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0);
+          }
+          break;
+        case STRING:
+          writeString(map.varChar(fieldName), reader.getString());
+          break;
+        case BYTE:
+          if (allTextMode) {
+            writeString(map.varChar(fieldName), String.valueOf(reader.getByte()));
+          } else if (readNumbersAsDouble) {
+            map.float8(fieldName).writeFloat8(reader.getByte());
+          } else {
+            map.tinyInt(fieldName).writeTinyInt(reader.getByte());
+          }
+          break;
+        case SHORT:
+          if (allTextMode) {
+            writeString(map.varChar(fieldName), String.valueOf(reader.getShort()));
+          } else if (readNumbersAsDouble) {
+            map.float8(fieldName).writeFloat8(reader.getShort());
+          } else {
+            map.smallInt(fieldName).writeSmallInt(reader.getShort());
+          }
+          break;
+        case INT:
+          if (allTextMode) {
+            writeString(map.varChar(fieldName), String.valueOf(reader.getInt()));
+          } else if (readNumbersAsDouble) {
+            map.float8(fieldName).writeFloat8(reader.getInt());
+          } else {
+            map.integer(fieldName).writeInt(reader.getInt());
+          }
+          break;
+        case LONG:
+          if (allTextMode) {
+            writeString(map.varChar(fieldName), String.valueOf(reader.getLong()));
+          } else if (readNumbersAsDouble) {
+            map.float8(fieldName).writeFloat8(reader.getLong());
+          } else {
+            map.bigInt(fieldName).writeBigInt(reader.getLong());
+          }
+          break;
+        case FLOAT:
+          if (allTextMode) {
+            writeString(map.varChar(fieldName), String.valueOf(reader.getFloat()));
+          } else if (readNumbersAsDouble) {
+            map.float8(fieldName).writeFloat8(reader.getFloat());
+          } else {
+            map.float4(fieldName).writeFloat4(reader.getFloat());
+          }
+          break;
+        case DOUBLE:
+          if (allTextMode) {
+            writeString(map.varChar(fieldName), String.valueOf(reader.getDouble()));
+          } else {
+            map.float8(fieldName).writeFloat8(reader.getDouble());
+          }
+          break;
+        case DECIMAL:
+          throw unsupportedError("Decimal type is currently not supported.");
+        case DATE:
+          if (allTextMode) {
+            writeString(map.varChar(fieldName), reader.getDate().toString());
+          } else {
+
+            long milliSecondsSinceEpoch = reader.getDate().toDaysSinceEpoch() * MILLISECONDS_IN_A_DAY;
+            map.date(fieldName).writeDate(milliSecondsSinceEpoch);
+          }
+          break;
+        case TIME:
+          if (allTextMode) {
+            writeString(map.varChar(fieldName), reader.getTime().toString());
+          } else {
+            OTime t = reader.getTime();
+            int h = t.getHour();
+            int m = t.getMinute();
+            int s = t.getSecond();
+            int ms = t.getMilliSecond();
+            int millisOfDay = ms + (s + ((m + (h * 60)) * 60)) * 1000;
+            map.time(fieldName).writeTime(millisOfDay);
+          }
+          break;
+        case TIMESTAMP:
+          if (allTextMode) {
+            writeString(map.varChar(fieldName), reader.getTimestamp().toString());
+          } else {
+            map.timeStamp(fieldName).writeTimeStamp(reader.getTimestampLong());
+          }
+          break;
+        case INTERVAL:
+          throw unsupportedError("Interval type is currently not supported.");
+        case START_MAP:
+          writeToMap(reader, map.map(fieldName));
+          break;
+        case START_ARRAY:
+          writeToList(reader, map.list(fieldName));
+          break;
+        case END_ARRAY:
+          throw dataReadError("Encountered an END_ARRAY event inside a map.");
+        default:
+          throw unsupportedError("Unsupported type: %s encountered during the query.", event);
+        }
+      } catch (IllegalStateException | IllegalArgumentException e) {
+        logger.warn(String.format("Possible schema change at _id: '%s', field: '%s'",
+            IdCodec.asString(reader.getId()), fieldName), e);
+      }
+    }
+    map.end();
+  }
+
+  private void writeToList(DBDocumentReaderBase reader, ListWriter list) {
+    list.startList();
+    outside: while (true) {
+      EventType event = reader.next();
+      if (event == null || event == EventType.END_ARRAY) break outside;
+
+      switch (event) {
+      case NULL:
+        throw unsupportedError("Null values are not supported in lists.");
+      case BINARY:
+        if (allTextMode) {
+          writeString(list.varChar(), new String(reader.getBinary().array(), Charset.forName("UTF-8")));
+        } else {
+          writeBinary(list.varBinary(), reader.getBinary());
+        }
+        break;
+      case BOOLEAN:
+        if (allTextMode) {
+          writeString(list.varChar(), String.valueOf(reader.getBoolean()));
+        } else {
+          list.bit().writeBit(reader.getBoolean() ? 1 : 0);
+        }
+        break;
+      case STRING:
+        writeString(list.varChar(), reader.getString());
+        break;
+      case BYTE:
+        if (allTextMode) {
+          writeString(list.varChar(), String.valueOf(reader.getByte()));
+        } else if (readNumbersAsDouble) {
+          list.float8().writeFloat8(reader.getByte());
+        } else {
+          list.tinyInt().writeTinyInt(reader.getByte());
+        }
+        break;
+      case SHORT:
+        if (allTextMode) {
+          writeString(list.varChar(), String.valueOf(reader.getShort()));
+        } else if (readNumbersAsDouble) {
+          list.float8().writeFloat8(reader.getShort());
+        } else {
+          list.smallInt().writeSmallInt(reader.getShort());
+        }
+        break;
+      case INT:
+        if (allTextMode) {
+          writeString(list.varChar(), String.valueOf(reader.getInt()));
+        } else if (readNumbersAsDouble) {
+          list.float8().writeFloat8(reader.getInt());
+        } else {
+          list.integer().writeInt(reader.getInt());
+        }
+        break;
+      case LONG:
+        if (allTextMode) {
+          writeString(list.varChar(), String.valueOf(reader.getLong()));
+        } else if (readNumbersAsDouble) {
+          list.float8().writeFloat8(reader.getLong());
+        } else {
+          list.bigInt().writeBigInt(reader.getLong());
+        }
+        break;
+      case FLOAT:
+        if (allTextMode) {
+          writeString(list.varChar(), String.valueOf(reader.getFloat()));
+        } else if (readNumbersAsDouble) {
+          list.float8().writeFloat8(reader.getFloat());
+        } else {
+          list.float4().writeFloat4(reader.getFloat());
+        }
+        break;
+      case DOUBLE:
+        if (allTextMode) {
+          writeString(list.varChar(), String.valueOf(reader.getDouble()));
+        } else {
+          list.float8().writeFloat8(reader.getDouble());
+        }
+        break;
+      case DECIMAL:
+        throw unsupportedError("Decimals are currently not supported.");
+      case DATE:
+        if (allTextMode) {
+          writeString(list.varChar(), reader.getDate().toString());
+        } else {
+          long milliSecondsSinceEpoch = reader.getDate().toDaysSinceEpoch() * MILLISECONDS_IN_A_DAY;
+          list.date().writeDate(milliSecondsSinceEpoch);
+        }
+        break;
+      case TIME:
+        if (allTextMode) {
+          writeString(list.varChar(), reader.getTime().toString());
+        } else {
+          OTime t = reader.getTime();
+          int h = t.getHour();
+          int m = t.getMinute();
+          int s = t.getSecond();
+          int ms = t.getMilliSecond();
+          int millisOfDay = ms + (s + ((m + (h * 60)) * 60)) * 1000;
+          list.time().writeTime(millisOfDay);
+        }
+        break;
+      case TIMESTAMP:
+        if (allTextMode) {
+          writeString(list.varChar(), reader.getTimestamp().toString());
+        } else {
+          list.timeStamp().writeTimeStamp(reader.getTimestampLong());
+        }
+        break;
+      case INTERVAL:
+        throw unsupportedError("Interval is currently not supported.");
+      case START_MAP:
+        writeToMap(reader, list.map());
+        break;
+      case END_MAP:
+        throw dataReadError("Encountered an END_MAP event inside a list.");
+      case START_ARRAY:
+        writeToList(reader, list.list());
+        break;
+      default:
+        throw unsupportedError("Unsupported type: %s encountered during the query.%s", event);
+      }
+    }
+    list.endList();
+  }
+
+  private void writeBinary(VarBinaryWriter binaryWriter, ByteBuffer buf) {
+    buffer = buffer.reallocIfNeeded(buf.remaining());
+    buffer.setBytes(0, buf, buf.position(), buf.remaining());
+    binaryWriter.writeVarBinary(0, buf.remaining(), buffer);
+  }
+
+  private void writeString(VarCharWriter varCharWriter, String string) {
+    final byte[] strBytes = Bytes.toBytes(string);
+    buffer = buffer.reallocIfNeeded(strBytes.length);
+    buffer.setBytes(0, strBytes);
+    varCharWriter.writeVarChar(0, strBytes.length, buffer);
+  }
+
+  private UserException unsupportedError(String format, Object... args) {
+    return UserException.unsupportedError()
+        .message(String.format(format, args))
+        .build(logger);
+  }
+
+  private UserException dataReadError(String format, Object... args) {
+    return UserException.dataReadError()
+        .message(String.format(format, args))
+        .build(logger);
+  }
+
+  private DBDocumentReaderBase nextDocumentReader() {
+    final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats();
+    try {
+      if (operatorStats != null) {
+        operatorStats.startWait();
+      }
+      try {
+        if (!documentReaderIterators.hasNext()) {
+          return null;
+        } else {
+          return (DBDocumentReaderBase) documentReaderIterators.next();
+        }
+      } finally {
+        if (operatorStats != null) {
+          operatorStats.stopWait();
+        }
+      }
+    } catch (DBException e) {
+      throw UserException.dataReadError(e).build(logger);
+    }
+  }
+
+  /*
+   * Extracts contiguous named segments from the SchemaPath, starting from the
+   * root segment and build the FieldPath from it for projection.
+   *
+   * This is due to bug 22726 and 22727, which cause DB's DocumentReaders to
+   * behave incorrectly for sparse lists, hence we avoid projecting beyond the
+   * first encountered ARRAY field and let Drill handle the projection.
+   */
+  private static FieldPath getFieldPathForProjection(SchemaPath column) {
+    Stack<PathSegment.NameSegment> pathSegments = new Stack<PathSegment.NameSegment>();
+    PathSegment seg = column.getRootSegment();
+    while (seg != null && seg.isNamed()) {
+      pathSegments.push((PathSegment.NameSegment) seg);
+      seg = seg.getChild();
+    }
+    FieldSegment.NameSegment child = null;
+    while (!pathSegments.isEmpty()) {
+      child = new FieldSegment.NameSegment(pathSegments.pop().getPath(), child, false);
+    }
+    return new FieldPath(child);
+  }
+
+  @Override
+  public void close() {
+    if (documentStream != null) {
+      documentStream.close();
+    }
+    if (table != null) {
+      table.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/util/CommonFns.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/util/CommonFns.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/util/CommonFns.java
new file mode 100644
index 0000000..a7b8cd1
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/util/CommonFns.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.util;
+
+public class CommonFns {
+
+  public static boolean isNullOrEmpty(final byte[] key) {
+    return key == null || key.length == 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatMatcher.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatMatcher.java
new file mode 100644
index 0000000..47e9927
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatMatcher.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.streams;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.store.mapr.TableFormatMatcher;
+import org.apache.drill.exec.store.mapr.TableFormatPlugin;
+
+import com.mapr.fs.MapRFileStatus;
+
+public class StreamsFormatMatcher extends TableFormatMatcher {
+
+  public StreamsFormatMatcher(TableFormatPlugin plugin) {
+    super(plugin);
+  }
+
+  @Override
+  protected boolean isSupportedTable(MapRFileStatus status) throws IOException {
+    return getFormatPlugin()
+        .getMaprFS()
+        .getTableProperties(status.getPath())
+        .getAttr()
+        .getIsMarlinTable();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
new file mode 100644
index 0000000..811e245
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.streams;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.mapr.TableFormatPlugin;
+import org.apache.hadoop.conf.Configuration;
+
+public class StreamsFormatPlugin extends TableFormatPlugin {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamsFormatPlugin.class);
+  private StreamsFormatMatcher matcher;
+
+  public StreamsFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+      StoragePluginConfig storageConfig, StreamsFormatPluginConfig formatConfig) {
+    super(name, context, fsConf, storageConfig, formatConfig);
+    matcher = new StreamsFormatMatcher(this);
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public boolean supportsAutoPartitioning() {
+    return false;
+  }
+
+  @Override
+  public FormatMatcher getMatcher() {
+    return matcher;
+  }
+
+  @Override
+  public AbstractWriter getWriter(PhysicalOperator child, String location,
+      List<String> partitionColumns) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+      List<SchemaPath> columns) throws IOException {
+    List<String> files = selection.getFiles();
+    assert (files.size() == 1);
+    //TableProperties props = getMaprFS().getTableProperties(new Path(files.get(0)));
+    throw new UnsupportedOperationException("not implemented");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPluginConfig.java
new file mode 100644
index 0000000..b061f03
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPluginConfig.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.streams;
+
+import org.apache.drill.exec.store.mapr.TableFormatPluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("streams")  @JsonInclude(Include.NON_DEFAULT)
+public class StreamsFormatPluginConfig extends TableFormatPluginConfig {
+
+  @Override
+  public int hashCode() {
+    return 47;
+  }
+
+  @Override
+  protected boolean impEquals(Object obj) {
+    return true; // TODO: compare custom properties once added
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java
deleted file mode 100644
index fa084f0..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.planner.logical.DynamicDrillTable;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.dfs.FormatMatcher;
-import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.hadoop.fs.FileStatus;
-
-import com.mapr.fs.MapRFileStatus;
-
-public class MapRDBFormatMatcher extends FormatMatcher {
-
-  private final FormatPlugin plugin;
-
-  public MapRDBFormatMatcher(FormatPlugin plugin) {
-    this.plugin = plugin;
-  }
-
-  @Override
-  public boolean supportDirectoryReads() {
-    return false;
-  }
-
-  public DrillTable isReadable(DrillFileSystem fs,
-      FileSelection selection, FileSystemPlugin fsPlugin,
-      String storageEngineName, String userName) throws IOException {
-    FileStatus status = selection.getFirstPath(fs);
-    if (!isFileReadable(fs, status)) {
-      return null;
-    }
-
-    return new DynamicDrillTable(fsPlugin, storageEngineName, userName,
-        new FormatSelection(getFormatPlugin().getConfig(), selection));
-  }
-
-  @Override
-  public boolean isFileReadable(DrillFileSystem fs, FileStatus status) throws IOException {
-    return (status instanceof MapRFileStatus) &&  ((MapRFileStatus) status).isTable();
-  }
-
-  @Override
-  public FormatPlugin getFormatPlugin() {
-    return plugin;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
deleted file mode 100644
index 0694f5b..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import static com.mapr.fs.jni.MapRConstants.MAPRFS_PREFIX;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.AbstractWriter;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.dfs.FormatMatcher;
-import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.hbase.HBaseScanSpec;
-import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan;
-import org.apache.drill.exec.store.maprdb.json.JsonScanSpec;
-import org.apache.drill.exec.store.maprdb.json.JsonTableGroupScan;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.collect.ImmutableSet;
-import com.mapr.fs.MapRFileSystem;
-import com.mapr.fs.tables.TableProperties;
-
-public class MapRDBFormatPlugin implements FormatPlugin {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
-      .getLogger(MapRDBFormatPlugin.class);
-
-  private final FileSystemConfig storageConfig;
-  private final MapRDBFormatPluginConfig config;
-  private final MapRDBFormatMatcher matcher;
-  private final Configuration fsConf;
-  private final DrillbitContext context;
-  private final String name;
-
-  private volatile FileSystemPlugin storagePlugin;
-  private volatile MapRFileSystem maprfs;
-
-  public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
-      StoragePluginConfig storageConfig) {
-    this(name, context, fsConf, storageConfig, new MapRDBFormatPluginConfig());
-  }
-
-  public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
-      StoragePluginConfig storageConfig, MapRDBFormatPluginConfig formatConfig) {
-    this.context = context;
-    this.config = formatConfig;
-    this.matcher = new MapRDBFormatMatcher(this);
-    this.storageConfig = (FileSystemConfig) storageConfig;
-    this.fsConf = fsConf;
-    this.name = name == null ? "maprdb" : name;
-    try {
-      this.maprfs = new MapRFileSystem();
-      maprfs.initialize(new URI(MAPRFS_PREFIX), fsConf);
-    } catch (IOException | URISyntaxException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public boolean supportsRead() {
-    return true;
-  }
-
-  @Override
-  public boolean supportsWrite() {
-    return false;
-  }
-
-  @Override
-  public boolean supportsAutoPartitioning() {
-    return false;
-  }
-
-  @Override
-  public FormatMatcher getMatcher() {
-    return matcher;
-  }
-
-  public Configuration getFsConf() {
-    return fsConf;
-  }
-
-  @Override
-  public AbstractWriter getWriter(PhysicalOperator child, String location,
-      List<String> partitionColumns) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  @JsonIgnore
-  public Set<StoragePluginOptimizerRule> getOptimizerRules() {
-    return ImmutableSet.of(MapRDBPushFilterIntoScan.FILTER_ON_SCAN, MapRDBPushFilterIntoScan.FILTER_ON_PROJECT);
-  }
-
-  @Override
-  public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
-      List<SchemaPath> columns) throws IOException {
-    List<String> files = selection.getFiles();
-    assert (files.size() == 1);
-    String tableName = files.get(0);
-    TableProperties props = maprfs.getTableProperties(new Path(tableName));
-
-    if (props.getAttr().getJson()) {
-      JsonScanSpec scanSpec = new JsonScanSpec(tableName, null/*condition*/);
-      return new JsonTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns);
-    } else {
-      HBaseScanSpec scanSpec = new HBaseScanSpec(tableName);
-      return new BinaryTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns);
-    }
-  }
-
-  @Override
-  public FormatPluginConfig getConfig() {
-    return config;
-  }
-
-  @Override
-  public StoragePluginConfig getStorageConfig() {
-    return storageConfig;
-  }
-
-  @Override
-  public DrillbitContext getContext() {
-    return context;
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  public synchronized FileSystemPlugin getStoragePlugin() {
-    if (this.storagePlugin == null) {
-      try {
-        this.storagePlugin = (FileSystemPlugin) (context.getStorage().getPlugin(storageConfig));
-      } catch (ExecutionSetupException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    return storagePlugin;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPluginConfig.java
deleted file mode 100644
index eb341d9..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPluginConfig.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import org.apache.drill.common.logical.FormatPluginConfig;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("maprdb")  @JsonInclude(Include.NON_DEFAULT)
-public class MapRDBFormatPluginConfig implements FormatPluginConfig {
-
-  private boolean allTextMode = false;
-  private boolean readAllNumbersAsDouble = false;
-
-  @Override
-  public int hashCode() {
-    return 53;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if (obj == null) {
-      return false;
-    }
-
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-
-    MapRDBFormatPluginConfig other = (MapRDBFormatPluginConfig)obj;
-
-    if (readAllNumbersAsDouble != other.readAllNumbersAsDouble) {
-      return false;
-    }
-
-    if (allTextMode != other.allTextMode) {
-      return false;
-    }
-
-    return true;
-  }
-
-  public boolean isReadAllNumbersAsDouble() {
-    return readAllNumbersAsDouble;
-  }
-
-  public boolean isAllTextMode() {
-    return allTextMode;
-  }
-
-  @JsonProperty("allTextMode")
-  public void setAllTextMode(boolean mode) {
-    allTextMode = mode;
-  }
-
-  @JsonProperty("readAllNumbersAsDouble")
-  public void setReadAllNumbersAsDouble(boolean read) {
-    readAllNumbersAsDouble = read;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
deleted file mode 100644
index 393bfe5..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public abstract class MapRDBGroupScan extends AbstractGroupScan {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);
-
-  private FileSystemPlugin storagePlugin;
-
-  private MapRDBFormatPlugin formatPlugin;
-
-  protected MapRDBFormatPluginConfig formatPluginConfig;
-
-  protected List<SchemaPath> columns;
-
-  protected Map<Integer, List<MapRDBSubScanSpec>> endpointFragmentMapping;
-
-  protected NavigableMap<TabletFragmentInfo, String> regionsToScan;
-
-  private boolean filterPushedDown = false;
-
-  private Stopwatch watch = Stopwatch.createUnstarted();
-
-  private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MapRDBSubScanSpec>>() {
-    @Override
-    public int compare(List<MapRDBSubScanSpec> list1, List<MapRDBSubScanSpec> list2) {
-      return list1.size() - list2.size();
-    }
-  };
-
-  private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
-
-  public MapRDBGroupScan(MapRDBGroupScan that) {
-    super(that);
-    this.columns = that.columns;
-    this.formatPlugin = that.formatPlugin;
-    this.formatPluginConfig = that.formatPluginConfig;
-    this.storagePlugin = that.storagePlugin;
-    this.regionsToScan = that.regionsToScan;
-    this.filterPushedDown = that.filterPushedDown;
-  }
-
-  public MapRDBGroupScan(FileSystemPlugin storagePlugin,
-      MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String userName) {
-    super(userName);
-    this.storagePlugin = storagePlugin;
-    this.formatPlugin = formatPlugin;
-    this.formatPluginConfig = (MapRDBFormatPluginConfig)formatPlugin.getConfig();
-    this.columns = columns;
-  }
-
-  @Override
-  public List<EndpointAffinity> getOperatorAffinity() {
-    watch.reset();
-    watch.start();
-    Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, DrillbitEndpoint>();
-    for (DrillbitEndpoint ep : formatPlugin.getContext().getBits()) {
-      endpointMap.put(ep.getAddress(), ep);
-    }
-
-    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
-    for (String serverName : regionsToScan.values()) {
-      DrillbitEndpoint ep = endpointMap.get(serverName);
-      if (ep != null) {
-        EndpointAffinity affinity = affinityMap.get(ep);
-        if (affinity == null) {
-          affinityMap.put(ep, new EndpointAffinity(ep, 1));
-        } else {
-          affinity.addAffinity(1);
-        }
-      }
-    }
-    logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS)/1000);
-    return Lists.newArrayList(affinityMap.values());
-  }
-
-  /**
-   *
-   * @param incomingEndpoints
-   */
-  @Override
-  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
-    watch.reset();
-    watch.start();
-
-    final int numSlots = incomingEndpoints.size();
-    Preconditions.checkArgument(numSlots <= regionsToScan.size(),
-        String.format("Incoming endpoints %d is greater than number of scan regions %d", numSlots, regionsToScan.size()));
-
-    /*
-     * Minimum/Maximum number of assignment per slot
-     */
-    final int minPerEndpointSlot = (int) Math.floor((double)regionsToScan.size() / numSlots);
-    final int maxPerEndpointSlot = (int) Math.ceil((double)regionsToScan.size() / numSlots);
-
-    /*
-     * initialize (endpoint index => HBaseSubScanSpec list) map
-     */
-    endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots);
-
-    /*
-     * another map with endpoint (hostname => corresponding index list) in 'incomingEndpoints' list
-     */
-    Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
-
-    /*
-     * Initialize these two maps
-     */
-    for (int i = 0; i < numSlots; ++i) {
-      endpointFragmentMapping.put(i, new ArrayList<MapRDBSubScanSpec>(maxPerEndpointSlot));
-      String hostname = incomingEndpoints.get(i).getAddress();
-      Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
-      if (hostIndexQueue == null) {
-        hostIndexQueue = Lists.newLinkedList();
-        endpointHostIndexListMap.put(hostname, hostIndexQueue);
-      }
-      hostIndexQueue.add(i);
-    }
-
-    Set<Entry<TabletFragmentInfo, String>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet());
-
-    /*
-     * First, we assign regions which are hosted on region servers running on drillbit endpoints
-     */
-    for (Iterator<Entry<TabletFragmentInfo, String>> regionsIterator = regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) {
-      Entry<TabletFragmentInfo, String> regionEntry = regionsIterator.next();
-      /*
-       * Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region
-       */
-      Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue());
-      if (endpointIndexlist != null) {
-        Integer slotIndex = endpointIndexlist.poll();
-        List<MapRDBSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
-        endpointSlotScanList.add(getSubScanSpec(regionEntry.getKey()));
-        // add to the tail of the slot list, to add more later in round robin fashion
-        endpointIndexlist.offer(slotIndex);
-        // this region has been assigned
-        regionsIterator.remove();
-      }
-    }
-
-    /*
-     * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more.
-     */
-    PriorityQueue<List<MapRDBSubScanSpec>> minHeap = new PriorityQueue<List<MapRDBSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
-    PriorityQueue<List<MapRDBSubScanSpec>> maxHeap = new PriorityQueue<List<MapRDBSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
-    for(List<MapRDBSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
-      if (listOfScan.size() < minPerEndpointSlot) {
-        minHeap.offer(listOfScan);
-      } else if (listOfScan.size() > minPerEndpointSlot){
-        maxHeap.offer(listOfScan);
-      }
-    }
-
-    /*
-     * Now, let's process any regions which remain unassigned and assign them to slots with minimum number of assignments.
-     */
-    if (regionsToAssignSet.size() > 0) {
-      for (Entry<TabletFragmentInfo, String> regionEntry : regionsToAssignSet) {
-        List<MapRDBSubScanSpec> smallestList = minHeap.poll();
-        smallestList.add(getSubScanSpec(regionEntry.getKey()));
-        if (smallestList.size() < maxPerEndpointSlot) {
-          minHeap.offer(smallestList);
-        }
-      }
-    }
-
-    /*
-     * While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more.
-     */
-    while(minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
-      List<MapRDBSubScanSpec> smallestList = (List<MapRDBSubScanSpec>) minHeap.poll();
-      List<MapRDBSubScanSpec> largestList = (List<MapRDBSubScanSpec>) maxHeap.poll();
-      smallestList.add(largestList.remove(largestList.size()-1));
-      if (largestList.size() > minPerEndpointSlot) {
-        maxHeap.offer(largestList);
-      }
-      if (smallestList.size() < minPerEndpointSlot) {
-        minHeap.offer(smallestList);
-      }
-    }
-
-    /* no slot should be empty at this point */
-    assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format(
-        "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.",
-        incomingEndpoints, endpointFragmentMapping.toString());
-
-    logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}",
-        watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString());
-  }
-
-  @Override
-  public int getMaxParallelizationWidth() {
-    return regionsToScan.size();
-  }
-
-  @JsonIgnore
-  public MapRDBFormatPlugin getFormatPlugin() {
-    return formatPlugin;
-  }
-
-  @Override
-  public String getDigest() {
-    return toString();
-  }
-
-  @JsonProperty("storage")
-  public FileSystemConfig getStorageConfig() {
-    return (FileSystemConfig) storagePlugin.getConfig();
-  }
-
-  @JsonIgnore
-  public FileSystemPlugin getStoragePlugin(){
-    return storagePlugin;
-  }
-
-  @JsonProperty
-  public List<SchemaPath> getColumns() {
-    return columns;
-  }
-
-  @JsonIgnore
-  public boolean canPushdownProjects(List<SchemaPath> columns) {
-    return true;
-  }
-
-  @JsonIgnore
-  public void setFilterPushedDown(boolean b) {
-    this.filterPushedDown = true;
-  }
-
-  @JsonIgnore
-  public boolean isFilterPushedDown() {
-    return filterPushedDown;
-  }
-
-  protected abstract MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo key);
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
deleted file mode 100644
index c0a33bf..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rex.RexNode;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.exec.planner.logical.DrillOptiq;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.planner.physical.FilterPrel;
-import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.planner.physical.ProjectPrel;
-import org.apache.drill.exec.planner.physical.ScanPrel;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.hbase.HBaseScanSpec;
-import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan;
-import org.apache.drill.exec.store.maprdb.binary.MapRDBFilterBuilder;
-import org.apache.drill.exec.store.maprdb.json.JsonConditionBuilder;
-import org.apache.drill.exec.store.maprdb.json.JsonScanSpec;
-import org.apache.drill.exec.store.maprdb.json.JsonTableGroupScan;
-
-import com.google.common.collect.ImmutableList;
-
-public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class);
-
-  private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String description) {
-    super(operand, description);
-  }
-
-  public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan:Filter_On_Scan") {
-
-    @Override
-    public void onMatch(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel) call.rel(1);
-      final FilterPrel filter = (FilterPrel) call.rel(0);
-      final RexNode condition = filter.getCondition();
-
-      if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
-        BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan();
-        doPushFilterIntoBinaryGroupScan(call, filter, null, scan, groupScan, condition);
-      } else {
-        assert(scan.getGroupScan() instanceof JsonTableGroupScan);
-        JsonTableGroupScan groupScan = (JsonTableGroupScan)scan.getGroupScan();
-        doPushFilterIntoJsonGroupScan(call, filter, null, scan, groupScan, condition);
-      }
-    }
-
-    @Override
-    public boolean matches(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel) call.rel(1);
-      if (scan.getGroupScan() instanceof BinaryTableGroupScan ||
-          scan.getGroupScan() instanceof JsonTableGroupScan) {
-        return super.matches(call);
-      }
-      return false;
-    }
-  };
-
-  public static final StoragePluginOptimizerRule FILTER_ON_PROJECT = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))), "MapRDBPushFilterIntoScan:Filter_On_Project") {
-
-    @Override
-    public void onMatch(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel) call.rel(2);
-      final ProjectPrel project = (ProjectPrel) call.rel(1);
-      final FilterPrel filter = (FilterPrel) call.rel(0);
-
-      // convert the filter to one that references the child of the project
-      final RexNode condition =  RelOptUtil.pushFilterPastProject(filter.getCondition(), project);
-
-      if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
-        BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan();
-        doPushFilterIntoBinaryGroupScan(call, filter, project, scan, groupScan, condition);
-      } else {
-        assert(scan.getGroupScan() instanceof JsonTableGroupScan);
-        JsonTableGroupScan groupScan = (JsonTableGroupScan)scan.getGroupScan();
-        doPushFilterIntoJsonGroupScan(call, filter, project, scan, groupScan, condition);
-      }
-    }
-
-    @Override
-    public boolean matches(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel) call.rel(2);
-      if (scan.getGroupScan() instanceof BinaryTableGroupScan ||
-          scan.getGroupScan() instanceof JsonTableGroupScan) {
-        return super.matches(call);
-      }
-      return false;
-    }
-  };
-
-  protected void doPushFilterIntoJsonGroupScan(RelOptRuleCall call,
-      FilterPrel filter, final ProjectPrel project, ScanPrel scan,
-      JsonTableGroupScan groupScan, RexNode condition) {
-
-    if (groupScan.isFilterPushedDown()) {
-      /*
-       * The rule can get triggered again due to the transformed "scan => filter" sequence
-       * created by the earlier execution of this rule when we could not do a complete
-       * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
-       * this flag to not do a re-processing of the rule on the already transformed call.
-       */
-      return;
-    }
-
-    LogicalExpression conditionExp = null;
-    try {
-      conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
-    } catch (ClassCastException e) {
-      // MD-771 bug in DrillOptiq.toDrill() causes filter condition on ITEM operator to throw ClassCastException
-      // For such cases, we return without pushdown
-      return;
-    }
-    final JsonConditionBuilder jsonConditionBuilder = new JsonConditionBuilder(groupScan, conditionExp);
-    final JsonScanSpec newScanSpec = jsonConditionBuilder.parseTree();
-    if (newScanSpec == null) {
-      return; //no filter pushdown ==> No transformation.
-    }
-
-    final JsonTableGroupScan newGroupsScan = new JsonTableGroupScan(groupScan.getUserName(),
-                                                                    groupScan.getStoragePlugin(),
-                                                                    groupScan.getFormatPlugin(),
-                                                                    newScanSpec,
-                                                                    groupScan.getColumns());
-    newGroupsScan.setFilterPushedDown(true);
-
-    final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
-
-    // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
-    final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
-
-    if (jsonConditionBuilder.isAllExpressionsConverted()) {
-        /*
-         * Since we could convert the entire filter condition expression into an HBase filter,
-         * we can eliminate the filter operator altogether.
-         */
-      call.transformTo(childRel);
-    } else {
-      call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel)));
-    }
-  }
-
-  protected void doPushFilterIntoBinaryGroupScan(final RelOptRuleCall call,
-                                                 final FilterPrel filter,
-                                                 final ProjectPrel project,
-                                                 final ScanPrel scan,
-                                                 final BinaryTableGroupScan groupScan,
-                                                 final RexNode condition) {
-
-    if (groupScan.isFilterPushedDown()) {
-      /*
-       * The rule can get triggered again due to the transformed "scan => filter" sequence
-       * created by the earlier execution of this rule when we could not do a complete
-       * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
-       * this flag to not do a re-processing of the rule on the already transformed call.
-       */
-      return;
-    }
-
-    final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
-    final MapRDBFilterBuilder maprdbFilterBuilder = new MapRDBFilterBuilder(groupScan, conditionExp);
-    final HBaseScanSpec newScanSpec = maprdbFilterBuilder.parseTree();
-    if (newScanSpec == null) {
-      return; //no filter pushdown ==> No transformation.
-    }
-
-    final BinaryTableGroupScan newGroupsScan = new BinaryTableGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
-                                                              groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns());
-    newGroupsScan.setFilterPushedDown(true);
-
-    final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
-
-    // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
-    final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
-
-    if (maprdbFilterBuilder.isAllExpressionsConverted()) {
-        /*
-         * Since we could convert the entire filter condition expression into an HBase filter,
-         * we can eliminate the filter operator altogether.
-         */
-      call.transformTo(childRel);
-    } else {
-      call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
deleted file mode 100644
index 1cd33ca..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.hbase.HBaseRecordReader;
-import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
-import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan;
-import org.apache.drill.exec.store.maprdb.json.MaprDBJsonRecordReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class);
-
-  @Override
-  public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
-    Preconditions.checkArgument(children.isEmpty());
-    List<RecordReader> readers = Lists.newArrayList();
-    Configuration conf = HBaseConfiguration.create();
-    for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
-      try {
-        if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) {
-          readers.add(new HBaseRecordReader(conf, getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context));
-        } else {
-          readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context));
-        }
-      } catch (Exception e1) {
-        throw new ExecutionSetupException(e1);
-      }
-    }
-    return new ScanBatch(subScan, context, readers.iterator());
-  }
-
-  private HBaseSubScanSpec getHBaseSubScanSpec(MapRDBSubScanSpec scanSpec) {
-    return new HBaseSubScanSpec(scanSpec.getTableName(), scanSpec.getRegionServer(),
-        scanSpec.getStartRow(), scanSpec.getStopRow(), scanSpec.getSerializedFilter(), null);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java
deleted file mode 100644
index 7ea4cbf..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractBase;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-
-// Class containing information for reading a single HBase region
-@JsonTypeName("maprdb-sub-scan")
-public class MapRDBSubScan extends AbstractBase implements SubScan {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class);
-
-  @JsonProperty
-  public final StoragePluginConfig storage;
-  @JsonIgnore
-  private final MapRDBFormatPluginConfig fsFormatPluginConfig;
-  private final FileSystemPlugin fsStoragePlugin;
-  private final List<MapRDBSubScanSpec> regionScanSpecList;
-  private final List<SchemaPath> columns;
-  private final String tableType;
-
-  @JsonCreator
-  public MapRDBSubScan(@JacksonInject StoragePluginRegistry registry,
-                       @JsonProperty("userName") String userName,
-                       @JsonProperty("formatPluginConfig") MapRDBFormatPluginConfig formatPluginConfig,
-                       @JsonProperty("storage") StoragePluginConfig storage,
-                       @JsonProperty("regionScanSpecList") List<MapRDBSubScanSpec> regionScanSpecList,
-                       @JsonProperty("columns") List<SchemaPath> columns,
-                       @JsonProperty("tableType") String tableType) throws ExecutionSetupException {
-    super(userName);
-    this.fsFormatPluginConfig = formatPluginConfig;
-    this.fsStoragePlugin = (FileSystemPlugin) registry.getPlugin(storage);
-    this.regionScanSpecList = regionScanSpecList;
-    this.storage = storage;
-    this.columns = columns;
-    this.tableType = tableType;
-  }
-
-  public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig config,
-      List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType) {
-    super(userName);
-    fsFormatPluginConfig = formatPluginConfig;
-    fsStoragePlugin = storagePlugin;
-    storage = config;
-    this.regionScanSpecList = maprSubScanSpecs;
-    this.columns = columns;
-    this.tableType = tableType;
-  }
-
-  public List<MapRDBSubScanSpec> getRegionScanSpecList() {
-    return regionScanSpecList;
-  }
-
-  public List<SchemaPath> getColumns() {
-    return columns;
-  }
-
-  @Override
-  public boolean isExecutable() {
-    return false;
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
-    return physicalVisitor.visitSubScan(this, value);
-  }
-
-  @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    Preconditions.checkArgument(children.isEmpty());
-    return new MapRDBSubScan(getUserName(), fsFormatPluginConfig, fsStoragePlugin, storage, regionScanSpecList, columns, tableType);
-  }
-
-  @Override
-  public Iterator<PhysicalOperator> iterator() {
-    return Iterators.emptyIterator();
-  }
-
-  @Override
-  public int getOperatorType() {
-    return 1001;
-  }
-
-  public String getTableType() {
-    return tableType;
-  }
-
-  public MapRDBFormatPluginConfig getFormatPluginConfig() {
-    return fsFormatPluginConfig;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java
deleted file mode 100644
index cc8bc5d..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.mapr.fs.jni.MapRConstants;
-import com.mapr.org.apache.hadoop.hbase.util.Bytes;
-
-public class MapRDBSubScanSpec {
-
-  protected String tableName;
-  protected String regionServer;
-  protected byte[] startRow;
-  protected byte[] stopRow;
-  protected byte[] serializedFilter;
-
-  @JsonCreator
-  public MapRDBSubScanSpec(@JsonProperty("tableName") String tableName,
-                           @JsonProperty("regionServer") String regionServer,
-                           @JsonProperty("startRow") byte[] startRow,
-                           @JsonProperty("stopRow") byte[] stopRow,
-                           @JsonProperty("serializedFilter") byte[] serializedFilter,
-                           @JsonProperty("filterString") String filterString) {
-    if (serializedFilter != null && filterString != null) {
-      throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time.");
-    }
-    this.tableName = tableName;
-    this.regionServer = regionServer;
-    this.startRow = startRow;
-    this.stopRow = stopRow;
-    this.serializedFilter = serializedFilter;
-  }
-
-  /* package */ MapRDBSubScanSpec() {
-    // empty constructor, to be used with builder pattern;
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-
-  public MapRDBSubScanSpec setTableName(String tableName) {
-    this.tableName = tableName;
-    return this;
-  }
-
-  public String getRegionServer() {
-    return regionServer;
-  }
-
-  public MapRDBSubScanSpec setRegionServer(String regionServer) {
-    this.regionServer = regionServer;
-    return this;
-  }
-
-  /**
-   * @return the raw (not-encoded) start row key for this sub-scan
-   */
-  public byte[] getStartRow() {
-    return startRow == null ? MapRConstants.EMPTY_BYTE_ARRAY: startRow;
-  }
-
-  public MapRDBSubScanSpec setStartRow(byte[] startRow) {
-    this.startRow = startRow;
-    return this;
-  }
-
-  /**
-   * @return the raw (not-encoded) stop row key for this sub-scan
-   */
-  public byte[] getStopRow() {
-    return stopRow == null ? MapRConstants.EMPTY_BYTE_ARRAY : stopRow;
-  }
-
-  public MapRDBSubScanSpec setStopRow(byte[] stopRow) {
-    this.stopRow = stopRow;
-    return this;
-  }
-
-  public byte[] getSerializedFilter() {
-    return serializedFilter;
-  }
-
-  public MapRDBSubScanSpec setSerializedFilter(byte[] serializedFilter) {
-    this.serializedFilter = serializedFilter;
-    return this;
-  }
-
-  @Override
-  public String toString() {
-    return "MapRDBSubScanSpec [tableName=" + tableName
-        + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow))
-        + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow))
-        + ", filter=" + (getSerializedFilter() == null ? null : Bytes.toBase64(getSerializedFilter()))
-        + ", regionServer=" + regionServer + "]";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java
deleted file mode 100644
index d2b1453..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.mapr.TableMappingRulesFactory;
-
-import com.mapr.fs.hbase.HBaseAdminImpl;
-
-public class MapRDBTableStats {
-  private static volatile HBaseAdminImpl admin = null;
-
-  private long numRows;
-
-  public MapRDBTableStats(Configuration conf, String tablePath) throws Exception {
-    if (admin == null) {
-      synchronized (MapRDBTableStats.class) {
-        if (admin == null) {
-          Configuration config = conf;
-          admin = new HBaseAdminImpl(config, TableMappingRulesFactory.create(conf));
-        }
-      }
-    }
-    numRows = admin.getNumRows(tablePath);
-  }
-
-  public long getNumRows() {
-    return numRows;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java
deleted file mode 100644
index 389f00d..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-
-import com.mapr.db.impl.TabletInfoImpl;
-
-public class TabletFragmentInfo  implements Comparable<TabletFragmentInfo> {
-
-  final private HRegionInfo regionInfo;
-  final private TabletInfoImpl tabletInfoImpl;
-
-  public TabletFragmentInfo(HRegionInfo regionInfo) {
-    this(null, regionInfo);
-  }
-
-  public TabletFragmentInfo(TabletInfoImpl tabletInfoImpl) {
-    this(tabletInfoImpl, null);
-  }
-
-  TabletFragmentInfo(TabletInfoImpl tabletInfoImpl, HRegionInfo regionInfo) {
-    this.regionInfo = regionInfo;
-    this.tabletInfoImpl = tabletInfoImpl;
-  }
-
-  public HRegionInfo getRegionInfo() {
-    return regionInfo;
-  }
-
-  public TabletInfoImpl getTabletInfoImpl() {
-    return tabletInfoImpl;
-  }
-
-  public boolean containsRow(byte[] row) {
-    return tabletInfoImpl != null ? tabletInfoImpl.containsRow(row) :
-        regionInfo.containsRow(row);
-  }
-
-  public byte[] getStartKey() {
-    return tabletInfoImpl != null ? tabletInfoImpl.getStartRow() :
-        regionInfo.getStartKey();
-  }
-
-  public byte[] getEndKey() {
-    return tabletInfoImpl != null ? tabletInfoImpl.getStopRow() :
-        regionInfo.getEndKey();
-  }
-
-  @Override
-  public int compareTo(TabletFragmentInfo o) {
-    return tabletInfoImpl != null ? tabletInfoImpl.compareTo(o.tabletInfoImpl) :
-        regionInfo.compareTo(o.regionInfo);
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((regionInfo == null) ? 0 : regionInfo.hashCode());
-    result = prime * result + ((tabletInfoImpl == null) ? 0 : tabletInfoImpl.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    TabletFragmentInfo other = (TabletFragmentInfo) obj;
-    if (regionInfo == null) {
-      if (other.regionInfo != null)
-        return false;
-    } else if (!regionInfo.equals(other.regionInfo))
-      return false;
-    if (tabletInfoImpl == null) {
-      if (other.tabletInfoImpl != null)
-        return false;
-    } else if (!tabletInfoImpl.equals(other.tabletInfoImpl))
-      return false;
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    return "TabletFragmentInfo [regionInfo=" + regionInfo + ", tabletInfoImpl=" + tabletInfoImpl
-        + "]";
-  }
-
-}


Mime
View raw message