Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1FBC4200B95 for ; Tue, 13 Sep 2016 03:31:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1E55C160AB8; Tue, 13 Sep 2016 01:31:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E95BF160AD9 for ; Tue, 13 Sep 2016 03:31:52 +0200 (CEST) Received: (qmail 16264 invoked by uid 500); 13 Sep 2016 01:31:52 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 14606 invoked by uid 99); 13 Sep 2016 01:31:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 01:31:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D9198E00C4; Tue, 13 Sep 2016 01:31:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: adi@apache.org To: commits@drill.apache.org Date: Tue, 13 Sep 2016 01:32:24 -0000 Message-Id: <5c3df63482f64390ae32b9e47d4653ff@git.apache.org> In-Reply-To: <12bc6f8dff7b420fa5513c35113a14e5@git.apache.org> References: <12bc6f8dff7b420fa5513c35113a14e5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [37/50] [abbrv] drill git commit: Refactoring code for better organization. archived-at: Tue, 13 Sep 2016 01:31:56 -0000 http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java new file mode 100644 index 0000000..7fbcd1b --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java @@ -0,0 +1,569 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db.json; + +import static org.ojai.DocumentConstants.ID_KEY; +import io.netty.buffer.DrillBuf; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.Stack; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig; +import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec; +import org.apache.drill.exec.vector.BaseValueVector; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; +import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter; +import org.apache.drill.exec.vector.complex.writer.VarCharWriter; +import org.ojai.DocumentReader; +import org.ojai.DocumentReader.EventType; +import org.ojai.DocumentStream; +import org.ojai.FieldPath; +import org.ojai.FieldSegment; +import org.ojai.Value; +import org.ojai.store.QueryCondition; +import org.ojai.types.OTime; + +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; +import com.mapr.db.MapRDB; +import com.mapr.db.Table; +import com.mapr.db.Table.TableOption; +import com.mapr.db.exceptions.DBException; +import com.mapr.db.impl.IdCodec; +import com.mapr.db.ojai.DBDocumentReaderBase; +import com.mapr.db.util.ByteBufs; +import com.mapr.org.apache.hadoop.hbase.util.Bytes; + +public class MaprDBJsonRecordReader extends AbstractRecordReader { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MaprDBJsonRecordReader.class); + + public static final SchemaPath ID_PATH = SchemaPath.getSimplePath(ID_KEY); + + private Table table; + private QueryCondition condition; + private FieldPath[] projectedFields; + + private final String tableName; + private OperatorContext operatorContext; + private VectorContainerWriter writer; + + private DrillBuf buffer; + + private DocumentStream documentStream; + + private Iterator documentReaderIterators; + + private boolean includeId; + private boolean idOnly; + private final boolean unionEnabled; + private final boolean readNumbersAsDouble; + private final boolean allTextMode; + private final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24; + + public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, + MapRDBFormatPluginConfig formatPluginConfig, + List projectedColumns, FragmentContext context) { + buffer = context.getManagedBuffer(); + projectedFields = null; + tableName = Preconditions.checkNotNull(subScanSpec, "MapRDB reader needs a sub-scan spec").getTableName(); + documentReaderIterators = null; + includeId = false; + idOnly = false; + byte[] serializedFilter = subScanSpec.getSerializedFilter(); + condition = null; + + if (serializedFilter != null) { + condition = com.mapr.db.impl.ConditionImpl.parseFrom(ByteBufs.wrap(serializedFilter)); + } + + setColumns(projectedColumns); + unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE); + readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble(); + allTextMode = formatPluginConfig.isAllTextMode(); + } + + @Override + protected Collection transformColumns(Collection columns) { + Set transformed = Sets.newLinkedHashSet(); + if (!isStarQuery()) { + Set projectedFieldsSet = Sets.newTreeSet(); + for (SchemaPath column : columns) { + if (column.getRootSegment().getPath().equalsIgnoreCase(ID_KEY)) { + /* + * we do not include _id field in the set of projected fields + * because the DB currently can not return a document if only + * the _id field was projected. This should really be fixed in + * the DB client (Bug 21708) to avoid transferring the entire + * document when only _id is requested. + */ + // projectedFieldsList.add(ID_FIELD); + includeId = true; + } else { + projectedFieldsSet.add(getFieldPathForProjection(column)); + } + transformed.add(column); + } + if (projectedFieldsSet.size() > 0) { + projectedFields = projectedFieldsSet.toArray(new FieldPath[projectedFieldsSet.size()]); + } + } else { + transformed.add(AbstractRecordReader.STAR_COLUMN); + includeId = true; + } + + /* + * (Bug 21708) if we are projecting only the id field, save that condition here. + */ + idOnly = !isStarQuery() && (projectedFields == null); + return transformed; + } + + @Override + public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { + this.writer = new VectorContainerWriter(output, unionEnabled); + this.operatorContext = context; + + try { + table = MapRDB.getTable(tableName); + table.setOption(TableOption.EXCLUDEID, !includeId); + documentStream = table.find(condition, projectedFields); + documentReaderIterators = documentStream.documentReaders().iterator(); + } catch (DBException e) { + throw new ExecutionSetupException(e); + } + } + + @Override + public int next() { + Stopwatch watch = Stopwatch.createUnstarted(); + watch.start(); + + writer.allocate(); + writer.reset(); + + int recordCount = 0; + DBDocumentReaderBase reader = null; + + while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) { + try { + reader = nextDocumentReader(); + if (reader == null) break; + writer.setPosition(recordCount); + if (idOnly) { + Value id = reader.getId(); + MapWriter map = writer.rootAsMap(); + + try { + switch(id.getType()) { + case STRING: + writeString(map.varChar(ID_KEY), id.getString()); + recordCount++; + break; + case BINARY: + if (allTextMode) { + writeString(map.varChar(ID_KEY), new String(id.getBinary().array(), Charset.forName("UTF-8"))); + } else { + writeBinary(map.varBinary(ID_KEY), id.getBinary()); + } + recordCount++; + break; + default: + throw new UnsupportedOperationException(id.getType() + + " is not a supported type for _id field."); + } + } catch (IllegalStateException | IllegalArgumentException e) { + logger.warn(String.format("Possible schema change at _id: '%s'", + IdCodec.asString(id)), e); + } + } else { + if (reader.next() != EventType.START_MAP) { + throw dataReadError("The document did not start with START_MAP!"); + } + writeToMap(reader, writer.rootAsMap()); + recordCount++; + } + } catch (UserException e) { + throw UserException.unsupportedError(e) + .addContext(String.format("Table: %s, document id: '%s'", + table.getPath(), + reader == null ? null : IdCodec.asString(reader.getId()))) + .build(logger); + } + } + + writer.setValueCount(recordCount); + logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), recordCount); + return recordCount; + } + + private void writeToMap(DBDocumentReaderBase reader, MapWriter map) { + map.start(); + outside: while (true) { + EventType event = reader.next(); + if (event == null || event == EventType.END_MAP) break outside; + + String fieldName = reader.getFieldName(); + try { + switch (event) { + case NULL: + break; // not setting the field will leave it as null + case BINARY: + if (allTextMode) { + writeString(map.varChar(fieldName), new String(reader.getBinary().array(), Charset.forName("UTF-8"))); + } else { + writeBinary(map.varBinary(fieldName), reader.getBinary()); + } + break; + case BOOLEAN: + if (allTextMode) { + writeString(map.varChar(fieldName), String.valueOf(reader.getBoolean())); + } else { + map.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0); + } + break; + case STRING: + writeString(map.varChar(fieldName), reader.getString()); + break; + case BYTE: + if (allTextMode) { + writeString(map.varChar(fieldName), String.valueOf(reader.getByte())); + } else if (readNumbersAsDouble) { + map.float8(fieldName).writeFloat8(reader.getByte()); + } else { + map.tinyInt(fieldName).writeTinyInt(reader.getByte()); + } + break; + case SHORT: + if (allTextMode) { + writeString(map.varChar(fieldName), String.valueOf(reader.getShort())); + } else if (readNumbersAsDouble) { + map.float8(fieldName).writeFloat8(reader.getShort()); + } else { + map.smallInt(fieldName).writeSmallInt(reader.getShort()); + } + break; + case INT: + if (allTextMode) { + writeString(map.varChar(fieldName), String.valueOf(reader.getInt())); + } else if (readNumbersAsDouble) { + map.float8(fieldName).writeFloat8(reader.getInt()); + } else { + map.integer(fieldName).writeInt(reader.getInt()); + } + break; + case LONG: + if (allTextMode) { + writeString(map.varChar(fieldName), String.valueOf(reader.getLong())); + } else if (readNumbersAsDouble) { + map.float8(fieldName).writeFloat8(reader.getLong()); + } else { + map.bigInt(fieldName).writeBigInt(reader.getLong()); + } + break; + case FLOAT: + if (allTextMode) { + writeString(map.varChar(fieldName), String.valueOf(reader.getFloat())); + } else if (readNumbersAsDouble) { + map.float8(fieldName).writeFloat8(reader.getFloat()); + } else { + map.float4(fieldName).writeFloat4(reader.getFloat()); + } + break; + case DOUBLE: + if (allTextMode) { + writeString(map.varChar(fieldName), String.valueOf(reader.getDouble())); + } else { + map.float8(fieldName).writeFloat8(reader.getDouble()); + } + break; + case DECIMAL: + throw unsupportedError("Decimal type is currently not supported."); + case DATE: + if (allTextMode) { + writeString(map.varChar(fieldName), reader.getDate().toString()); + } else { + + long milliSecondsSinceEpoch = reader.getDate().toDaysSinceEpoch() * MILLISECONDS_IN_A_DAY; + map.date(fieldName).writeDate(milliSecondsSinceEpoch); + } + break; + case TIME: + if (allTextMode) { + writeString(map.varChar(fieldName), reader.getTime().toString()); + } else { + OTime t = reader.getTime(); + int h = t.getHour(); + int m = t.getMinute(); + int s = t.getSecond(); + int ms = t.getMilliSecond(); + int millisOfDay = ms + (s + ((m + (h * 60)) * 60)) * 1000; + map.time(fieldName).writeTime(millisOfDay); + } + break; + case TIMESTAMP: + if (allTextMode) { + writeString(map.varChar(fieldName), reader.getTimestamp().toString()); + } else { + map.timeStamp(fieldName).writeTimeStamp(reader.getTimestampLong()); + } + break; + case INTERVAL: + throw unsupportedError("Interval type is currently not supported."); + case START_MAP: + writeToMap(reader, map.map(fieldName)); + break; + case START_ARRAY: + writeToList(reader, map.list(fieldName)); + break; + case END_ARRAY: + throw dataReadError("Encountered an END_ARRAY event inside a map."); + default: + throw unsupportedError("Unsupported type: %s encountered during the query.", event); + } + } catch (IllegalStateException | IllegalArgumentException e) { + logger.warn(String.format("Possible schema change at _id: '%s', field: '%s'", + IdCodec.asString(reader.getId()), fieldName), e); + } + } + map.end(); + } + + private void writeToList(DBDocumentReaderBase reader, ListWriter list) { + list.startList(); + outside: while (true) { + EventType event = reader.next(); + if (event == null || event == EventType.END_ARRAY) break outside; + + switch (event) { + case NULL: + throw unsupportedError("Null values are not supported in lists."); + case BINARY: + if (allTextMode) { + writeString(list.varChar(), new String(reader.getBinary().array(), Charset.forName("UTF-8"))); + } else { + writeBinary(list.varBinary(), reader.getBinary()); + } + break; + case BOOLEAN: + if (allTextMode) { + writeString(list.varChar(), String.valueOf(reader.getBoolean())); + } else { + list.bit().writeBit(reader.getBoolean() ? 1 : 0); + } + break; + case STRING: + writeString(list.varChar(), reader.getString()); + break; + case BYTE: + if (allTextMode) { + writeString(list.varChar(), String.valueOf(reader.getByte())); + } else if (readNumbersAsDouble) { + list.float8().writeFloat8(reader.getByte()); + } else { + list.tinyInt().writeTinyInt(reader.getByte()); + } + break; + case SHORT: + if (allTextMode) { + writeString(list.varChar(), String.valueOf(reader.getShort())); + } else if (readNumbersAsDouble) { + list.float8().writeFloat8(reader.getShort()); + } else { + list.smallInt().writeSmallInt(reader.getShort()); + } + break; + case INT: + if (allTextMode) { + writeString(list.varChar(), String.valueOf(reader.getInt())); + } else if (readNumbersAsDouble) { + list.float8().writeFloat8(reader.getInt()); + } else { + list.integer().writeInt(reader.getInt()); + } + break; + case LONG: + if (allTextMode) { + writeString(list.varChar(), String.valueOf(reader.getLong())); + } else if (readNumbersAsDouble) { + list.float8().writeFloat8(reader.getLong()); + } else { + list.bigInt().writeBigInt(reader.getLong()); + } + break; + case FLOAT: + if (allTextMode) { + writeString(list.varChar(), String.valueOf(reader.getFloat())); + } else if (readNumbersAsDouble) { + list.float8().writeFloat8(reader.getFloat()); + } else { + list.float4().writeFloat4(reader.getFloat()); + } + break; + case DOUBLE: + if (allTextMode) { + writeString(list.varChar(), String.valueOf(reader.getDouble())); + } else { + list.float8().writeFloat8(reader.getDouble()); + } + break; + case DECIMAL: + throw unsupportedError("Decimals are currently not supported."); + case DATE: + if (allTextMode) { + writeString(list.varChar(), reader.getDate().toString()); + } else { + long milliSecondsSinceEpoch = reader.getDate().toDaysSinceEpoch() * MILLISECONDS_IN_A_DAY; + list.date().writeDate(milliSecondsSinceEpoch); + } + break; + case TIME: + if (allTextMode) { + writeString(list.varChar(), reader.getTime().toString()); + } else { + OTime t = reader.getTime(); + int h = t.getHour(); + int m = t.getMinute(); + int s = t.getSecond(); + int ms = t.getMilliSecond(); + int millisOfDay = ms + (s + ((m + (h * 60)) * 60)) * 1000; + list.time().writeTime(millisOfDay); + } + break; + case TIMESTAMP: + if (allTextMode) { + writeString(list.varChar(), reader.getTimestamp().toString()); + } else { + list.timeStamp().writeTimeStamp(reader.getTimestampLong()); + } + break; + case INTERVAL: + throw unsupportedError("Interval is currently not supported."); + case START_MAP: + writeToMap(reader, list.map()); + break; + case END_MAP: + throw dataReadError("Encountered an END_MAP event inside a list."); + case START_ARRAY: + writeToList(reader, list.list()); + break; + default: + throw unsupportedError("Unsupported type: %s encountered during the query.%s", event); + } + } + list.endList(); + } + + private void writeBinary(VarBinaryWriter binaryWriter, ByteBuffer buf) { + buffer = buffer.reallocIfNeeded(buf.remaining()); + buffer.setBytes(0, buf, buf.position(), buf.remaining()); + binaryWriter.writeVarBinary(0, buf.remaining(), buffer); + } + + private void writeString(VarCharWriter varCharWriter, String string) { + final byte[] strBytes = Bytes.toBytes(string); + buffer = buffer.reallocIfNeeded(strBytes.length); + buffer.setBytes(0, strBytes); + varCharWriter.writeVarChar(0, strBytes.length, buffer); + } + + private UserException unsupportedError(String format, Object... args) { + return UserException.unsupportedError() + .message(String.format(format, args)) + .build(logger); + } + + private UserException dataReadError(String format, Object... args) { + return UserException.dataReadError() + .message(String.format(format, args)) + .build(logger); + } + + private DBDocumentReaderBase nextDocumentReader() { + final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats(); + try { + if (operatorStats != null) { + operatorStats.startWait(); + } + try { + if (!documentReaderIterators.hasNext()) { + return null; + } else { + return (DBDocumentReaderBase) documentReaderIterators.next(); + } + } finally { + if (operatorStats != null) { + operatorStats.stopWait(); + } + } + } catch (DBException e) { + throw UserException.dataReadError(e).build(logger); + } + } + + /* + * Extracts contiguous named segments from the SchemaPath, starting from the + * root segment and build the FieldPath from it for projection. + * + * This is due to bug 22726 and 22727, which cause DB's DocumentReaders to + * behave incorrectly for sparse lists, hence we avoid projecting beyond the + * first encountered ARRAY field and let Drill handle the projection. + */ + private static FieldPath getFieldPathForProjection(SchemaPath column) { + Stack pathSegments = new Stack(); + PathSegment seg = column.getRootSegment(); + while (seg != null && seg.isNamed()) { + pathSegments.push((PathSegment.NameSegment) seg); + seg = seg.getChild(); + } + FieldSegment.NameSegment child = null; + while (!pathSegments.isEmpty()) { + child = new FieldSegment.NameSegment(pathSegments.pop().getPath(), child, false); + } + return new FieldPath(child); + } + + @Override + public void close() { + if (documentStream != null) { + documentStream.close(); + } + if (table != null) { + table.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/util/CommonFns.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/util/CommonFns.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/util/CommonFns.java new file mode 100644 index 0000000..a7b8cd1 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/util/CommonFns.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db.util; + +public class CommonFns { + + public static boolean isNullOrEmpty(final byte[] key) { + return key == null || key.length == 0; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatMatcher.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatMatcher.java new file mode 100644 index 0000000..47e9927 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatMatcher.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.streams; + +import java.io.IOException; + +import org.apache.drill.exec.store.mapr.TableFormatMatcher; +import org.apache.drill.exec.store.mapr.TableFormatPlugin; + +import com.mapr.fs.MapRFileStatus; + +public class StreamsFormatMatcher extends TableFormatMatcher { + + public StreamsFormatMatcher(TableFormatPlugin plugin) { + super(plugin); + } + + @Override + protected boolean isSupportedTable(MapRFileStatus status) throws IOException { + return getFormatPlugin() + .getMaprFS() + .getTableProperties(status.getPath()) + .getAttr() + .getIsMarlinTable(); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java new file mode 100644 index 0000000..811e245 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.streams; + +import java.io.IOException; +import java.util.List; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.mapr.TableFormatPlugin; +import org.apache.hadoop.conf.Configuration; + +public class StreamsFormatPlugin extends TableFormatPlugin { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamsFormatPlugin.class); + private StreamsFormatMatcher matcher; + + public StreamsFormatPlugin(String name, DrillbitContext context, Configuration fsConf, + StoragePluginConfig storageConfig, StreamsFormatPluginConfig formatConfig) { + super(name, context, fsConf, storageConfig, formatConfig); + matcher = new StreamsFormatMatcher(this); + } + + @Override + public boolean supportsRead() { + return true; + } + + @Override + public boolean supportsWrite() { + return false; + } + + @Override + public boolean supportsAutoPartitioning() { + return false; + } + + @Override + public FormatMatcher getMatcher() { + return matcher; + } + + @Override + public AbstractWriter getWriter(PhysicalOperator child, String location, + List partitionColumns) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, + List columns) throws IOException { + List files = selection.getFiles(); + assert (files.size() == 1); + //TableProperties props = getMaprFS().getTableProperties(new Path(files.get(0))); + throw new UnsupportedOperationException("not implemented"); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPluginConfig.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPluginConfig.java new file mode 100644 index 0000000..b061f03 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPluginConfig.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.streams; + +import org.apache.drill.exec.store.mapr.TableFormatPluginConfig; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("streams") @JsonInclude(Include.NON_DEFAULT) +public class StreamsFormatPluginConfig extends TableFormatPluginConfig { + + @Override + public int hashCode() { + return 47; + } + + @Override + protected boolean impEquals(Object obj) { + return true; // TODO: compare custom properties once added + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java deleted file mode 100644 index fa084f0..0000000 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatMatcher.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.maprdb; - -import java.io.IOException; - -import org.apache.drill.exec.planner.logical.DrillTable; -import org.apache.drill.exec.planner.logical.DynamicDrillTable; -import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.store.dfs.FileSelection; -import org.apache.drill.exec.store.dfs.FileSystemPlugin; -import org.apache.drill.exec.store.dfs.FormatMatcher; -import org.apache.drill.exec.store.dfs.FormatPlugin; -import org.apache.drill.exec.store.dfs.FormatSelection; -import org.apache.hadoop.fs.FileStatus; - -import com.mapr.fs.MapRFileStatus; - -public class MapRDBFormatMatcher extends FormatMatcher { - - private final FormatPlugin plugin; - - public MapRDBFormatMatcher(FormatPlugin plugin) { - this.plugin = plugin; - } - - @Override - public boolean supportDirectoryReads() { - return false; - } - - public DrillTable isReadable(DrillFileSystem fs, - FileSelection selection, FileSystemPlugin fsPlugin, - String storageEngineName, String userName) throws IOException { - FileStatus status = selection.getFirstPath(fs); - if (!isFileReadable(fs, status)) { - return null; - } - - return new DynamicDrillTable(fsPlugin, storageEngineName, userName, - new FormatSelection(getFormatPlugin().getConfig(), selection)); - } - - @Override - public boolean isFileReadable(DrillFileSystem fs, FileStatus status) throws IOException { - return (status instanceof MapRFileStatus) && ((MapRFileStatus) status).isTable(); - } - - @Override - public FormatPlugin getFormatPlugin() { - return plugin; - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java deleted file mode 100644 index 0694f5b..0000000 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.maprdb; - -import static com.mapr.fs.jni.MapRConstants.MAPRFS_PREFIX; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Set; - -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.logical.FormatPluginConfig; -import org.apache.drill.common.logical.StoragePluginConfig; -import org.apache.drill.exec.physical.base.AbstractGroupScan; -import org.apache.drill.exec.physical.base.AbstractWriter; -import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.store.StoragePluginOptimizerRule; -import org.apache.drill.exec.store.dfs.FileSelection; -import org.apache.drill.exec.store.dfs.FileSystemConfig; -import org.apache.drill.exec.store.dfs.FileSystemPlugin; -import org.apache.drill.exec.store.dfs.FormatMatcher; -import org.apache.drill.exec.store.dfs.FormatPlugin; -import org.apache.drill.exec.store.hbase.HBaseScanSpec; -import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan; -import org.apache.drill.exec.store.maprdb.json.JsonScanSpec; -import org.apache.drill.exec.store.maprdb.json.JsonTableGroupScan; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.common.collect.ImmutableSet; -import com.mapr.fs.MapRFileSystem; -import com.mapr.fs.tables.TableProperties; - -public class MapRDBFormatPlugin implements FormatPlugin { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory - .getLogger(MapRDBFormatPlugin.class); - - private final FileSystemConfig storageConfig; - private final MapRDBFormatPluginConfig config; - private final MapRDBFormatMatcher matcher; - private final Configuration fsConf; - private final DrillbitContext context; - private final String name; - - private volatile FileSystemPlugin storagePlugin; - private volatile MapRFileSystem maprfs; - - public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf, - StoragePluginConfig storageConfig) { - this(name, context, fsConf, storageConfig, new MapRDBFormatPluginConfig()); - } - - public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf, - StoragePluginConfig storageConfig, MapRDBFormatPluginConfig formatConfig) { - this.context = context; - this.config = formatConfig; - this.matcher = new MapRDBFormatMatcher(this); - this.storageConfig = (FileSystemConfig) storageConfig; - this.fsConf = fsConf; - this.name = name == null ? "maprdb" : name; - try { - this.maprfs = new MapRFileSystem(); - maprfs.initialize(new URI(MAPRFS_PREFIX), fsConf); - } catch (IOException | URISyntaxException e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean supportsRead() { - return true; - } - - @Override - public boolean supportsWrite() { - return false; - } - - @Override - public boolean supportsAutoPartitioning() { - return false; - } - - @Override - public FormatMatcher getMatcher() { - return matcher; - } - - public Configuration getFsConf() { - return fsConf; - } - - @Override - public AbstractWriter getWriter(PhysicalOperator child, String location, - List partitionColumns) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - @JsonIgnore - public Set getOptimizerRules() { - return ImmutableSet.of(MapRDBPushFilterIntoScan.FILTER_ON_SCAN, MapRDBPushFilterIntoScan.FILTER_ON_PROJECT); - } - - @Override - public AbstractGroupScan getGroupScan(String userName, FileSelection selection, - List columns) throws IOException { - List files = selection.getFiles(); - assert (files.size() == 1); - String tableName = files.get(0); - TableProperties props = maprfs.getTableProperties(new Path(tableName)); - - if (props.getAttr().getJson()) { - JsonScanSpec scanSpec = new JsonScanSpec(tableName, null/*condition*/); - return new JsonTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns); - } else { - HBaseScanSpec scanSpec = new HBaseScanSpec(tableName); - return new BinaryTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns); - } - } - - @Override - public FormatPluginConfig getConfig() { - return config; - } - - @Override - public StoragePluginConfig getStorageConfig() { - return storageConfig; - } - - @Override - public DrillbitContext getContext() { - return context; - } - - @Override - public String getName() { - return name; - } - - public synchronized FileSystemPlugin getStoragePlugin() { - if (this.storagePlugin == null) { - try { - this.storagePlugin = (FileSystemPlugin) (context.getStorage().getPlugin(storageConfig)); - } catch (ExecutionSetupException e) { - throw new RuntimeException(e); - } - } - return storagePlugin; - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPluginConfig.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPluginConfig.java deleted file mode 100644 index eb341d9..0000000 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPluginConfig.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.maprdb; - -import org.apache.drill.common.logical.FormatPluginConfig; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; - -@JsonTypeName("maprdb") @JsonInclude(Include.NON_DEFAULT) -public class MapRDBFormatPluginConfig implements FormatPluginConfig { - - private boolean allTextMode = false; - private boolean readAllNumbersAsDouble = false; - - @Override - public int hashCode() { - return 53; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - - if (obj == null) { - return false; - } - - if (getClass() != obj.getClass()) { - return false; - } - - MapRDBFormatPluginConfig other = (MapRDBFormatPluginConfig)obj; - - if (readAllNumbersAsDouble != other.readAllNumbersAsDouble) { - return false; - } - - if (allTextMode != other.allTextMode) { - return false; - } - - return true; - } - - public boolean isReadAllNumbersAsDouble() { - return readAllNumbersAsDouble; - } - - public boolean isAllTextMode() { - return allTextMode; - } - - @JsonProperty("allTextMode") - public void setAllTextMode(boolean mode) { - allTextMode = mode; - } - - @JsonProperty("readAllNumbersAsDouble") - public void setReadAllNumbersAsDouble(boolean read) { - readAllNumbersAsDouble = read; - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java deleted file mode 100644 index 393bfe5..0000000 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java +++ /dev/null @@ -1,283 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.maprdb; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableMap; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.physical.EndpointAffinity; -import org.apache.drill.exec.physical.base.AbstractGroupScan; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.store.dfs.FileSystemConfig; -import org.apache.drill.exec.store.dfs.FileSystemPlugin; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -public abstract class MapRDBGroupScan extends AbstractGroupScan { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class); - - private FileSystemPlugin storagePlugin; - - private MapRDBFormatPlugin formatPlugin; - - protected MapRDBFormatPluginConfig formatPluginConfig; - - protected List columns; - - protected Map> endpointFragmentMapping; - - protected NavigableMap regionsToScan; - - private boolean filterPushedDown = false; - - private Stopwatch watch = Stopwatch.createUnstarted(); - - private static final Comparator> LIST_SIZE_COMPARATOR = new Comparator>() { - @Override - public int compare(List list1, List list2) { - return list1.size() - list2.size(); - } - }; - - private static final Comparator> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR); - - public MapRDBGroupScan(MapRDBGroupScan that) { - super(that); - this.columns = that.columns; - this.formatPlugin = that.formatPlugin; - this.formatPluginConfig = that.formatPluginConfig; - this.storagePlugin = that.storagePlugin; - this.regionsToScan = that.regionsToScan; - this.filterPushedDown = that.filterPushedDown; - } - - public MapRDBGroupScan(FileSystemPlugin storagePlugin, - MapRDBFormatPlugin formatPlugin, List columns, String userName) { - super(userName); - this.storagePlugin = storagePlugin; - this.formatPlugin = formatPlugin; - this.formatPluginConfig = (MapRDBFormatPluginConfig)formatPlugin.getConfig(); - this.columns = columns; - } - - @Override - public List getOperatorAffinity() { - watch.reset(); - watch.start(); - Map endpointMap = new HashMap(); - for (DrillbitEndpoint ep : formatPlugin.getContext().getBits()) { - endpointMap.put(ep.getAddress(), ep); - } - - Map affinityMap = new HashMap(); - for (String serverName : regionsToScan.values()) { - DrillbitEndpoint ep = endpointMap.get(serverName); - if (ep != null) { - EndpointAffinity affinity = affinityMap.get(ep); - if (affinity == null) { - affinityMap.put(ep, new EndpointAffinity(ep, 1)); - } else { - affinity.addAffinity(1); - } - } - } - logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS)/1000); - return Lists.newArrayList(affinityMap.values()); - } - - /** - * - * @param incomingEndpoints - */ - @Override - public void applyAssignments(List incomingEndpoints) { - watch.reset(); - watch.start(); - - final int numSlots = incomingEndpoints.size(); - Preconditions.checkArgument(numSlots <= regionsToScan.size(), - String.format("Incoming endpoints %d is greater than number of scan regions %d", numSlots, regionsToScan.size())); - - /* - * Minimum/Maximum number of assignment per slot - */ - final int minPerEndpointSlot = (int) Math.floor((double)regionsToScan.size() / numSlots); - final int maxPerEndpointSlot = (int) Math.ceil((double)regionsToScan.size() / numSlots); - - /* - * initialize (endpoint index => HBaseSubScanSpec list) map - */ - endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots); - - /* - * another map with endpoint (hostname => corresponding index list) in 'incomingEndpoints' list - */ - Map> endpointHostIndexListMap = Maps.newHashMap(); - - /* - * Initialize these two maps - */ - for (int i = 0; i < numSlots; ++i) { - endpointFragmentMapping.put(i, new ArrayList(maxPerEndpointSlot)); - String hostname = incomingEndpoints.get(i).getAddress(); - Queue hostIndexQueue = endpointHostIndexListMap.get(hostname); - if (hostIndexQueue == null) { - hostIndexQueue = Lists.newLinkedList(); - endpointHostIndexListMap.put(hostname, hostIndexQueue); - } - hostIndexQueue.add(i); - } - - Set> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet()); - - /* - * First, we assign regions which are hosted on region servers running on drillbit endpoints - */ - for (Iterator> regionsIterator = regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) { - Entry regionEntry = regionsIterator.next(); - /* - * Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region - */ - Queue endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue()); - if (endpointIndexlist != null) { - Integer slotIndex = endpointIndexlist.poll(); - List endpointSlotScanList = endpointFragmentMapping.get(slotIndex); - endpointSlotScanList.add(getSubScanSpec(regionEntry.getKey())); - // add to the tail of the slot list, to add more later in round robin fashion - endpointIndexlist.offer(slotIndex); - // this region has been assigned - regionsIterator.remove(); - } - } - - /* - * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more. - */ - PriorityQueue> minHeap = new PriorityQueue>(numSlots, LIST_SIZE_COMPARATOR); - PriorityQueue> maxHeap = new PriorityQueue>(numSlots, LIST_SIZE_COMPARATOR_REV); - for(List listOfScan : endpointFragmentMapping.values()) { - if (listOfScan.size() < minPerEndpointSlot) { - minHeap.offer(listOfScan); - } else if (listOfScan.size() > minPerEndpointSlot){ - maxHeap.offer(listOfScan); - } - } - - /* - * Now, let's process any regions which remain unassigned and assign them to slots with minimum number of assignments. - */ - if (regionsToAssignSet.size() > 0) { - for (Entry regionEntry : regionsToAssignSet) { - List smallestList = minHeap.poll(); - smallestList.add(getSubScanSpec(regionEntry.getKey())); - if (smallestList.size() < maxPerEndpointSlot) { - minHeap.offer(smallestList); - } - } - } - - /* - * While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more. - */ - while(minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) { - List smallestList = (List) minHeap.poll(); - List largestList = (List) maxHeap.poll(); - smallestList.add(largestList.remove(largestList.size()-1)); - if (largestList.size() > minPerEndpointSlot) { - maxHeap.offer(largestList); - } - if (smallestList.size() < minPerEndpointSlot) { - minHeap.offer(smallestList); - } - } - - /* no slot should be empty at this point */ - assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format( - "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.", - incomingEndpoints, endpointFragmentMapping.toString()); - - logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}", - watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString()); - } - - @Override - public int getMaxParallelizationWidth() { - return regionsToScan.size(); - } - - @JsonIgnore - public MapRDBFormatPlugin getFormatPlugin() { - return formatPlugin; - } - - @Override - public String getDigest() { - return toString(); - } - - @JsonProperty("storage") - public FileSystemConfig getStorageConfig() { - return (FileSystemConfig) storagePlugin.getConfig(); - } - - @JsonIgnore - public FileSystemPlugin getStoragePlugin(){ - return storagePlugin; - } - - @JsonProperty - public List getColumns() { - return columns; - } - - @JsonIgnore - public boolean canPushdownProjects(List columns) { - return true; - } - - @JsonIgnore - public void setFilterPushedDown(boolean b) { - this.filterPushedDown = true; - } - - @JsonIgnore - public boolean isFilterPushedDown() { - return filterPushedDown; - } - - protected abstract MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo key); - -} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java deleted file mode 100644 index c0a33bf..0000000 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.maprdb; - -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.plan.RelOptRuleOperand; -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rex.RexNode; -import org.apache.drill.common.expression.LogicalExpression; -import org.apache.drill.exec.planner.logical.DrillOptiq; -import org.apache.drill.exec.planner.logical.DrillParseContext; -import org.apache.drill.exec.planner.logical.RelOptHelper; -import org.apache.drill.exec.planner.physical.FilterPrel; -import org.apache.drill.exec.planner.physical.PrelUtil; -import org.apache.drill.exec.planner.physical.ProjectPrel; -import org.apache.drill.exec.planner.physical.ScanPrel; -import org.apache.drill.exec.store.StoragePluginOptimizerRule; -import org.apache.drill.exec.store.hbase.HBaseScanSpec; -import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan; -import org.apache.drill.exec.store.maprdb.binary.MapRDBFilterBuilder; -import org.apache.drill.exec.store.maprdb.json.JsonConditionBuilder; -import org.apache.drill.exec.store.maprdb.json.JsonScanSpec; -import org.apache.drill.exec.store.maprdb.json.JsonTableGroupScan; - -import com.google.common.collect.ImmutableList; - -public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class); - - private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String description) { - super(operand, description); - } - - public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan:Filter_On_Scan") { - - @Override - public void onMatch(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel) call.rel(1); - final FilterPrel filter = (FilterPrel) call.rel(0); - final RexNode condition = filter.getCondition(); - - if (scan.getGroupScan() instanceof BinaryTableGroupScan) { - BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan(); - doPushFilterIntoBinaryGroupScan(call, filter, null, scan, groupScan, condition); - } else { - assert(scan.getGroupScan() instanceof JsonTableGroupScan); - JsonTableGroupScan groupScan = (JsonTableGroupScan)scan.getGroupScan(); - doPushFilterIntoJsonGroupScan(call, filter, null, scan, groupScan, condition); - } - } - - @Override - public boolean matches(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel) call.rel(1); - if (scan.getGroupScan() instanceof BinaryTableGroupScan || - scan.getGroupScan() instanceof JsonTableGroupScan) { - return super.matches(call); - } - return false; - } - }; - - public static final StoragePluginOptimizerRule FILTER_ON_PROJECT = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))), "MapRDBPushFilterIntoScan:Filter_On_Project") { - - @Override - public void onMatch(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel) call.rel(2); - final ProjectPrel project = (ProjectPrel) call.rel(1); - final FilterPrel filter = (FilterPrel) call.rel(0); - - // convert the filter to one that references the child of the project - final RexNode condition = RelOptUtil.pushFilterPastProject(filter.getCondition(), project); - - if (scan.getGroupScan() instanceof BinaryTableGroupScan) { - BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan(); - doPushFilterIntoBinaryGroupScan(call, filter, project, scan, groupScan, condition); - } else { - assert(scan.getGroupScan() instanceof JsonTableGroupScan); - JsonTableGroupScan groupScan = (JsonTableGroupScan)scan.getGroupScan(); - doPushFilterIntoJsonGroupScan(call, filter, project, scan, groupScan, condition); - } - } - - @Override - public boolean matches(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel) call.rel(2); - if (scan.getGroupScan() instanceof BinaryTableGroupScan || - scan.getGroupScan() instanceof JsonTableGroupScan) { - return super.matches(call); - } - return false; - } - }; - - protected void doPushFilterIntoJsonGroupScan(RelOptRuleCall call, - FilterPrel filter, final ProjectPrel project, ScanPrel scan, - JsonTableGroupScan groupScan, RexNode condition) { - - if (groupScan.isFilterPushedDown()) { - /* - * The rule can get triggered again due to the transformed "scan => filter" sequence - * created by the earlier execution of this rule when we could not do a complete - * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon - * this flag to not do a re-processing of the rule on the already transformed call. - */ - return; - } - - LogicalExpression conditionExp = null; - try { - conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition); - } catch (ClassCastException e) { - // MD-771 bug in DrillOptiq.toDrill() causes filter condition on ITEM operator to throw ClassCastException - // For such cases, we return without pushdown - return; - } - final JsonConditionBuilder jsonConditionBuilder = new JsonConditionBuilder(groupScan, conditionExp); - final JsonScanSpec newScanSpec = jsonConditionBuilder.parseTree(); - if (newScanSpec == null) { - return; //no filter pushdown ==> No transformation. - } - - final JsonTableGroupScan newGroupsScan = new JsonTableGroupScan(groupScan.getUserName(), - groupScan.getStoragePlugin(), - groupScan.getFormatPlugin(), - newScanSpec, - groupScan.getColumns()); - newGroupsScan.setFilterPushedDown(true); - - final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType()); - - // Depending on whether is a project in the middle, assign either scan or copy of project to childRel. - final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));; - - if (jsonConditionBuilder.isAllExpressionsConverted()) { - /* - * Since we could convert the entire filter condition expression into an HBase filter, - * we can eliminate the filter operator altogether. - */ - call.transformTo(childRel); - } else { - call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel))); - } - } - - protected void doPushFilterIntoBinaryGroupScan(final RelOptRuleCall call, - final FilterPrel filter, - final ProjectPrel project, - final ScanPrel scan, - final BinaryTableGroupScan groupScan, - final RexNode condition) { - - if (groupScan.isFilterPushedDown()) { - /* - * The rule can get triggered again due to the transformed "scan => filter" sequence - * created by the earlier execution of this rule when we could not do a complete - * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon - * this flag to not do a re-processing of the rule on the already transformed call. - */ - return; - } - - final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition); - final MapRDBFilterBuilder maprdbFilterBuilder = new MapRDBFilterBuilder(groupScan, conditionExp); - final HBaseScanSpec newScanSpec = maprdbFilterBuilder.parseTree(); - if (newScanSpec == null) { - return; //no filter pushdown ==> No transformation. - } - - final BinaryTableGroupScan newGroupsScan = new BinaryTableGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(), - groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns()); - newGroupsScan.setFilterPushedDown(true); - - final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType()); - - // Depending on whether is a project in the middle, assign either scan or copy of project to childRel. - final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));; - - if (maprdbFilterBuilder.isAllExpressionsConverted()) { - /* - * Since we could convert the entire filter condition expression into an HBase filter, - * we can eliminate the filter operator altogether. - */ - call.transformTo(childRel); - } else { - call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel))); - } - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java deleted file mode 100644 index 1cd33ca..0000000 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.maprdb; - -import java.util.List; - -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.impl.BatchCreator; -import org.apache.drill.exec.physical.impl.ScanBatch; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.store.RecordReader; -import org.apache.drill.exec.store.hbase.HBaseRecordReader; -import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec; -import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan; -import org.apache.drill.exec.store.maprdb.json.MaprDBJsonRecordReader; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -public class MapRDBScanBatchCreator implements BatchCreator{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class); - - @Override - public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List children) throws ExecutionSetupException { - Preconditions.checkArgument(children.isEmpty()); - List readers = Lists.newArrayList(); - Configuration conf = HBaseConfiguration.create(); - for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){ - try { - if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) { - readers.add(new HBaseRecordReader(conf, getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context)); - } else { - readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context)); - } - } catch (Exception e1) { - throw new ExecutionSetupException(e1); - } - } - return new ScanBatch(subScan, context, readers.iterator()); - } - - private HBaseSubScanSpec getHBaseSubScanSpec(MapRDBSubScanSpec scanSpec) { - return new HBaseSubScanSpec(scanSpec.getTableName(), scanSpec.getRegionServer(), - scanSpec.getStartRow(), scanSpec.getStopRow(), scanSpec.getSerializedFilter(), null); - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java deleted file mode 100644 index 7ea4cbf..0000000 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScan.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.maprdb; - -import java.util.Iterator; -import java.util.List; - -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.logical.StoragePluginConfig; -import org.apache.drill.exec.physical.base.AbstractBase; -import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.physical.base.PhysicalVisitor; -import org.apache.drill.exec.physical.base.SubScan; -import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.store.dfs.FileSystemPlugin; - -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; - -// Class containing information for reading a single HBase region -@JsonTypeName("maprdb-sub-scan") -public class MapRDBSubScan extends AbstractBase implements SubScan { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class); - - @JsonProperty - public final StoragePluginConfig storage; - @JsonIgnore - private final MapRDBFormatPluginConfig fsFormatPluginConfig; - private final FileSystemPlugin fsStoragePlugin; - private final List regionScanSpecList; - private final List columns; - private final String tableType; - - @JsonCreator - public MapRDBSubScan(@JacksonInject StoragePluginRegistry registry, - @JsonProperty("userName") String userName, - @JsonProperty("formatPluginConfig") MapRDBFormatPluginConfig formatPluginConfig, - @JsonProperty("storage") StoragePluginConfig storage, - @JsonProperty("regionScanSpecList") List regionScanSpecList, - @JsonProperty("columns") List columns, - @JsonProperty("tableType") String tableType) throws ExecutionSetupException { - super(userName); - this.fsFormatPluginConfig = formatPluginConfig; - this.fsStoragePlugin = (FileSystemPlugin) registry.getPlugin(storage); - this.regionScanSpecList = regionScanSpecList; - this.storage = storage; - this.columns = columns; - this.tableType = tableType; - } - - public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig config, - List maprSubScanSpecs, List columns, String tableType) { - super(userName); - fsFormatPluginConfig = formatPluginConfig; - fsStoragePlugin = storagePlugin; - storage = config; - this.regionScanSpecList = maprSubScanSpecs; - this.columns = columns; - this.tableType = tableType; - } - - public List getRegionScanSpecList() { - return regionScanSpecList; - } - - public List getColumns() { - return columns; - } - - @Override - public boolean isExecutable() { - return false; - } - - @Override - public T accept(PhysicalVisitor physicalVisitor, X value) throws E { - return physicalVisitor.visitSubScan(this, value); - } - - @Override - public PhysicalOperator getNewWithChildren(List children) { - Preconditions.checkArgument(children.isEmpty()); - return new MapRDBSubScan(getUserName(), fsFormatPluginConfig, fsStoragePlugin, storage, regionScanSpecList, columns, tableType); - } - - @Override - public Iterator iterator() { - return Iterators.emptyIterator(); - } - - @Override - public int getOperatorType() { - return 1001; - } - - public String getTableType() { - return tableType; - } - - public MapRDBFormatPluginConfig getFormatPluginConfig() { - return fsFormatPluginConfig; - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java deleted file mode 100644 index cc8bc5d..0000000 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBSubScanSpec.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.maprdb; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.mapr.fs.jni.MapRConstants; -import com.mapr.org.apache.hadoop.hbase.util.Bytes; - -public class MapRDBSubScanSpec { - - protected String tableName; - protected String regionServer; - protected byte[] startRow; - protected byte[] stopRow; - protected byte[] serializedFilter; - - @JsonCreator - public MapRDBSubScanSpec(@JsonProperty("tableName") String tableName, - @JsonProperty("regionServer") String regionServer, - @JsonProperty("startRow") byte[] startRow, - @JsonProperty("stopRow") byte[] stopRow, - @JsonProperty("serializedFilter") byte[] serializedFilter, - @JsonProperty("filterString") String filterString) { - if (serializedFilter != null && filterString != null) { - throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time."); - } - this.tableName = tableName; - this.regionServer = regionServer; - this.startRow = startRow; - this.stopRow = stopRow; - this.serializedFilter = serializedFilter; - } - - /* package */ MapRDBSubScanSpec() { - // empty constructor, to be used with builder pattern; - } - - public String getTableName() { - return tableName; - } - - public MapRDBSubScanSpec setTableName(String tableName) { - this.tableName = tableName; - return this; - } - - public String getRegionServer() { - return regionServer; - } - - public MapRDBSubScanSpec setRegionServer(String regionServer) { - this.regionServer = regionServer; - return this; - } - - /** - * @return the raw (not-encoded) start row key for this sub-scan - */ - public byte[] getStartRow() { - return startRow == null ? MapRConstants.EMPTY_BYTE_ARRAY: startRow; - } - - public MapRDBSubScanSpec setStartRow(byte[] startRow) { - this.startRow = startRow; - return this; - } - - /** - * @return the raw (not-encoded) stop row key for this sub-scan - */ - public byte[] getStopRow() { - return stopRow == null ? MapRConstants.EMPTY_BYTE_ARRAY : stopRow; - } - - public MapRDBSubScanSpec setStopRow(byte[] stopRow) { - this.stopRow = stopRow; - return this; - } - - public byte[] getSerializedFilter() { - return serializedFilter; - } - - public MapRDBSubScanSpec setSerializedFilter(byte[] serializedFilter) { - this.serializedFilter = serializedFilter; - return this; - } - - @Override - public String toString() { - return "MapRDBSubScanSpec [tableName=" + tableName - + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow)) - + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow)) - + ", filter=" + (getSerializedFilter() == null ? null : Bytes.toBase64(getSerializedFilter())) - + ", regionServer=" + regionServer + "]"; - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java deleted file mode 100644 index d2b1453..0000000 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBTableStats.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.maprdb; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.mapr.TableMappingRulesFactory; - -import com.mapr.fs.hbase.HBaseAdminImpl; - -public class MapRDBTableStats { - private static volatile HBaseAdminImpl admin = null; - - private long numRows; - - public MapRDBTableStats(Configuration conf, String tablePath) throws Exception { - if (admin == null) { - synchronized (MapRDBTableStats.class) { - if (admin == null) { - Configuration config = conf; - admin = new HBaseAdminImpl(config, TableMappingRulesFactory.create(conf)); - } - } - } - numRows = admin.getNumRows(tablePath); - } - - public long getNumRows() { - return numRows; - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java deleted file mode 100644 index 389f00d..0000000 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/TabletFragmentInfo.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.maprdb; - -import org.apache.hadoop.hbase.HRegionInfo; - -import com.mapr.db.impl.TabletInfoImpl; - -public class TabletFragmentInfo implements Comparable { - - final private HRegionInfo regionInfo; - final private TabletInfoImpl tabletInfoImpl; - - public TabletFragmentInfo(HRegionInfo regionInfo) { - this(null, regionInfo); - } - - public TabletFragmentInfo(TabletInfoImpl tabletInfoImpl) { - this(tabletInfoImpl, null); - } - - TabletFragmentInfo(TabletInfoImpl tabletInfoImpl, HRegionInfo regionInfo) { - this.regionInfo = regionInfo; - this.tabletInfoImpl = tabletInfoImpl; - } - - public HRegionInfo getRegionInfo() { - return regionInfo; - } - - public TabletInfoImpl getTabletInfoImpl() { - return tabletInfoImpl; - } - - public boolean containsRow(byte[] row) { - return tabletInfoImpl != null ? tabletInfoImpl.containsRow(row) : - regionInfo.containsRow(row); - } - - public byte[] getStartKey() { - return tabletInfoImpl != null ? tabletInfoImpl.getStartRow() : - regionInfo.getStartKey(); - } - - public byte[] getEndKey() { - return tabletInfoImpl != null ? tabletInfoImpl.getStopRow() : - regionInfo.getEndKey(); - } - - @Override - public int compareTo(TabletFragmentInfo o) { - return tabletInfoImpl != null ? tabletInfoImpl.compareTo(o.tabletInfoImpl) : - regionInfo.compareTo(o.regionInfo); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((regionInfo == null) ? 0 : regionInfo.hashCode()); - result = prime * result + ((tabletInfoImpl == null) ? 0 : tabletInfoImpl.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - TabletFragmentInfo other = (TabletFragmentInfo) obj; - if (regionInfo == null) { - if (other.regionInfo != null) - return false; - } else if (!regionInfo.equals(other.regionInfo)) - return false; - if (tabletInfoImpl == null) { - if (other.tabletInfoImpl != null) - return false; - } else if (!tabletInfoImpl.equals(other.tabletInfoImpl)) - return false; - return true; - } - - @Override - public String toString() { - return "TabletFragmentInfo [regionInfo=" + regionInfo + ", tabletInfoImpl=" + tabletInfoImpl - + "]"; - } - -}