Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 39574180D6 for ; Mon, 11 Jan 2016 07:53:50 +0000 (UTC) Received: (qmail 88539 invoked by uid 500); 11 Jan 2016 07:53:50 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 88492 invoked by uid 500); 11 Jan 2016 07:53:50 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 88474 invoked by uid 99); 11 Jan 2016 07:53:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Jan 2016 07:53:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D9F7EE041F; Mon, 11 Jan 2016 07:53:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jacques@apache.org To: commits@drill.apache.org Date: Mon, 11 Jan 2016 07:53:49 -0000 Message-Id: <90208e6d24734ec8a04a65532abf2713@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/12] drill git commit: DRILL-4241: initial commit Repository: drill Updated Branches: refs/heads/master f964908ae -> 392d1f7e9 DRILL-4241: initial commit Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5dfb4512 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5dfb4512 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5dfb4512 Branch: refs/heads/master Commit: 5dfb451222b4259e274304bb360e2b000bcd26af Parents: f964908 Author: Jacques Nadeau Authored: Wed Nov 18 18:45:56 2015 -0800 Committer: Jacques Nadeau Committed: Sun Jan 10 22:46:16 2016 -0800 ---------------------------------------------------------------------- contrib/storage-kudu/.gitignore | 15 ++ contrib/storage-kudu/README.md | 1 + contrib/storage-kudu/pom.xml | 87 ++++++++ .../drill/exec/store/kudu/DrillKuduTable.java | 36 ++++ .../drill/exec/store/kudu/KuduGroupScan.java | 203 +++++++++++++++++++ .../drill/exec/store/kudu/KuduRecordReader.java | 178 ++++++++++++++++ .../exec/store/kudu/KuduScanBatchCreator.java | 57 ++++++ .../drill/exec/store/kudu/KuduScanSpec.java | 38 ++++ .../exec/store/kudu/KuduSchemaFactory.java | 95 +++++++++ .../exec/store/kudu/KuduStoragePlugin.java | 99 +++++++++ .../store/kudu/KuduStoragePluginConfig.java | 75 +++++++ .../drill/exec/store/kudu/KuduSubScan.java | 135 ++++++++++++ .../resources/bootstrap-storage-plugins.json | 9 + .../src/main/resources/drill-module.conf | 24 +++ .../drill/store/kudu/TestKuduConnect.java | 76 +++++++ .../apache/drill/store/kudu/TestKuduPlugin.java | 12 ++ .../drill/store/kudu/TestKuduTableProvider.java | 91 +++++++++ .../storage-kudu/src/test/resources/logback.xml | 64 ++++++ 18 files changed, 1295 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/.gitignore ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/.gitignore b/contrib/storage-kudu/.gitignore new file mode 100644 index 0000000..f290bae --- /dev/null +++ b/contrib/storage-kudu/.gitignore @@ -0,0 +1,15 @@ +.project +.buildpath +.classpath +.checkstyle +.settings/ +.idea/ +TAGS +*.log +*.lck +*.iml +target/ +*.DS_Store +*.patch +*~ +git.properties http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/README.md ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/README.md b/contrib/storage-kudu/README.md new file mode 100644 index 0000000..f4597d4 --- /dev/null +++ b/contrib/storage-kudu/README.md @@ -0,0 +1 @@ +# drill-storage-kudu http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/pom.xml b/contrib/storage-kudu/pom.xml new file mode 100644 index 0000000..7e57ca8 --- /dev/null +++ b/contrib/storage-kudu/pom.xml @@ -0,0 +1,87 @@ + + + + 4.0.0 + + drill-contrib-parent + org.apache.drill.contrib + 1.3.0 + + + drill-storage-kudu + 1.3.0-SNAPSHOT + + contrib/kudu-storage-plugin + + + 1.3.0 + + + + + org.apache.drill.exec + drill-java-exec + ${drill.version} + + + + + org.apache.drill.exec + drill-java-exec + tests + ${drill.version} + test + + + + org.apache.drill + drill-common + tests + ${drill.version} + test + + + + org.kududb + kudu-client + 0.5.0 + + + + + + + drill-1016 + https://repository.apache.org/content/repositories/orgapachedrill-1016/ + + false + + + + cdh.repo + Cloudera Repositories + https://repository.cloudera.com/artifactory/cloudera-repos + + false + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java new file mode 100644 index 0000000..3b9c757 --- /dev/null +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kudu; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.drill.exec.planner.logical.DynamicDrillTable; + +public class DrillKuduTable extends DynamicDrillTable { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillKuduTable.class); + + public DrillKuduTable(String storageEngineName, KuduStoragePlugin plugin, KuduScanSpec scanSpec) { + super(plugin, storageEngineName, scanSpec); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return super.getRowType(typeFactory); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java new file mode 100644 index 0000000..bc543d9 --- /dev/null +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kudu; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.kudu.KuduSubScan.KuduSubScanSpec; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +@JsonTypeName("kudu-scan") +public class KuduGroupScan extends AbstractGroupScan { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduGroupScan.class); + + private KuduStoragePluginConfig storagePluginConfig; + private List columns; + private KuduScanSpec kuduScanSpec; + private KuduStoragePlugin storagePlugin; + private boolean filterPushedDown = false; + + + @JsonCreator + public KuduGroupScan(@JsonProperty("kuduScanSpec") KuduScanSpec kuduScanSpec, + @JsonProperty("storage") KuduStoragePluginConfig storagePluginConfig, + @JsonProperty("columns") List columns, + @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { + this((KuduStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), kuduScanSpec, columns); + } + + public KuduGroupScan(KuduStoragePlugin storagePlugin, KuduScanSpec scanSpec, + List columns) { + super((String) null); + this.storagePlugin = storagePlugin; + this.storagePluginConfig = storagePlugin.getConfig(); + this.kuduScanSpec = scanSpec; + this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns; + } + + /** + * Private constructor, used for cloning. + * @param that The KuduGroupScan to clone + */ + private KuduGroupScan(KuduGroupScan that) { + super(that); + this.columns = that.columns; + this.kuduScanSpec = that.kuduScanSpec; + this.storagePlugin = that.storagePlugin; + this.storagePluginConfig = that.storagePluginConfig; + this.filterPushedDown = that.filterPushedDown; + } + + @Override + public GroupScan clone(List columns) { + KuduGroupScan newScan = new KuduGroupScan(this); + newScan.columns = columns; + return newScan; + } + + @Override + public List getOperatorAffinity() { + return Collections.EMPTY_LIST; + } + + + @Override + public int getMaxParallelizationWidth() { + return 1; + } + + + /** + * + * @param incomingEndpoints + */ + @Override + public void applyAssignments(List incomingEndpoints) { + } + + + @Override + public KuduSubScan getSpecificScan(int minorFragmentId) { + return new KuduSubScan(storagePlugin, storagePluginConfig, + ImmutableList.of(new KuduSubScanSpec(kuduScanSpec.getTableName())), + this.columns); + } + + // KuduStoragePlugin plugin, KuduStoragePluginConfig config, + // List tabletInfoList, List columns + @Override + public ScanStats getScanStats() { + return ScanStats.TRIVIAL_TABLE; + } + + @Override + @JsonIgnore + public PhysicalOperator getNewWithChildren(List children) { + Preconditions.checkArgument(children.isEmpty()); + return new KuduGroupScan(this); + } + + @JsonIgnore + public KuduStoragePlugin getStoragePlugin() { + return storagePlugin; + } + + @JsonIgnore + public String getTableName() { + return getKuduScanSpec().getTableName(); + } + + @Override + public String getDigest() { + return toString(); + } + + @Override + public String toString() { + return "KuduGroupScan [KuduScanSpec=" + + kuduScanSpec + ", columns=" + + columns + "]"; + } + + @JsonProperty("storage") + public KuduStoragePluginConfig getStorageConfig() { + return this.storagePluginConfig; + } + + @JsonProperty + public List getColumns() { + return columns; + } + + @JsonProperty + public KuduScanSpec getKuduScanSpec() { + return kuduScanSpec; + } + + @Override + @JsonIgnore + public boolean canPushdownProjects(List columns) { + return true; + } + + @JsonIgnore + public void setFilterPushedDown(boolean b) { + this.filterPushedDown = true; + } + + @JsonIgnore + public boolean isFilterPushedDown() { + return filterPushedDown; + } + + /** + * Empty constructor, do not use, only for testing. + */ + @VisibleForTesting + public KuduGroupScan() { + super((String)null); + } + + /** + * Do not use, only for testing. + */ + @VisibleForTesting + public void setKuduScanSpec(KuduScanSpec kuduScanSpec) { + this.kuduScanSpec = kuduScanSpec; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java new file mode 100644 index 0000000..0200527 --- /dev/null +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kudu; + +import io.netty.buffer.DrillBuf; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.kudu.KuduSubScan.KuduSubScanSpec; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; +import org.kududb.ColumnSchema; +import org.kududb.Type; +import org.kududb.client.KuduClient; +import org.kududb.client.KuduScanner; +import org.kududb.client.KuduTable; +import org.kududb.client.RowResult; +import org.kududb.client.RowResultIterator; +import org.kududb.client.shaded.com.google.common.collect.ImmutableMap; + +public class KuduRecordReader extends AbstractRecordReader { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduRecordReader.class); + + private static final int TARGET_RECORD_COUNT = 4000; + + private final KuduClient client; + private final KuduSubScanSpec scanSpec; + private KuduTable table; + private VectorContainerWriter containerWriter; + private MapWriter writer; + private KuduScanner scanner; + private RowResultIterator iterator; + private DrillBuf buffer; + + public KuduRecordReader(KuduClient client, KuduSubScan.KuduSubScanSpec subScanSpec, + List projectedColumns, FragmentContext context) { + setColumns(projectedColumns); + this.client = client; + buffer = context.getManagedBuffer(); + scanSpec = subScanSpec; + } + + @Override + public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { + try { + KuduTable table = client.openTable(scanSpec.getTableName()); + scanner = client.newScannerBuilder(table).build(); + containerWriter = new VectorContainerWriter(output); + writer = containerWriter.rootAsMap(); + } catch (Exception e) { + throw new ExecutionSetupException(e); + } + } + + static final Map TYPES; + + static { + TYPES = ImmutableMap. builder() + .put(Type.BINARY, Types.optional(MinorType.VARBINARY)) + .put(Type.BOOL, Types.optional(MinorType.BIT)) + .put(Type.DOUBLE, Types.optional(MinorType.FLOAT8)) + .put(Type.FLOAT, Types.optional(MinorType.FLOAT4)) + .put(Type.INT16, Types.optional(MinorType.INT)) + .put(Type.INT32, Types.optional(MinorType.INT)) + .put(Type.INT8, Types.optional(MinorType.INT)) + .put(Type.INT64, Types.optional(MinorType.BIGINT)) + .put(Type.STRING, Types.optional(MinorType.VARCHAR)) + .put(Type.TIMESTAMP, Types.optional(MinorType.TIMESTAMP)) + .build(); + } + + @Override + public int next() { + int rowCount = 0; + try { + while (iterator == null || !iterator.hasNext()) { + if (!scanner.hasMoreRows()) { + iterator = null; + break; + } + iterator = scanner.nextRows(); + + for (; rowCount < 4095 && iterator.hasNext(); rowCount++) { + writer.setPosition(rowCount); + RowResult result = iterator.next(); + int i = 0; + for (ColumnSchema column : result.getColumnProjection().getColumns()) { + switch (column.getType()) { + case STRING: { + final ByteBuffer buf = result.getBinary(i); + final int length = buf.remaining(); + ensure(length); + buffer.setBytes(0, buf); + writer.varChar(column.getName()).writeVarChar(0, length, buffer); + break; + } + case BINARY: { + final ByteBuffer buf = result.getBinary(i); + final int length = buf.remaining(); + ensure(length); + buffer.setBytes(0, buf); + writer.varBinary(column.getName()).writeVarBinary(0, length, buffer); + break; + } + case INT8: + writer.integer(column.getName()).writeInt(result.getByte(i)); + break; + case INT16: + writer.integer(column.getName()).writeInt(result.getShort(i)); + break; + case INT32: + writer.integer(column.getName()).writeInt(result.getInt(i)); + break; + case INT64: + writer.bigInt(column.getName()).writeBigInt(result.getLong(i)); + break; + case FLOAT: + writer.float4(column.getName()).writeFloat4(result.getFloat(i)); + break; + case DOUBLE: + writer.float8(column.getName()).writeFloat8(result.getDouble(i)); + break; + case BOOL: + writer.bit(column.getName()).writeBit(result.getBoolean(i) ? 1 : 0); + break; + case TIMESTAMP: + writer.timeStamp(column.getName()).writeTimeStamp(result.getLong(i) / 1000); + break; + default: + throw new UnsupportedOperationException("unsupported type " + column.getType()); + } + + i++; + } + } + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + containerWriter.setValueCount(rowCount); + return rowCount; + } + + private void ensure(final int length) { + buffer = buffer.reallocIfNeeded(length); + } + + @Override + public void close() { + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java new file mode 100644 index 0000000..b3c2c4e --- /dev/null +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kudu; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.RecordReader; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +public class KuduScanBatchCreator implements BatchCreator{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduScanBatchCreator.class); + + @Override + public ScanBatch getBatch(FragmentContext context, KuduSubScan subScan, List children) + throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + List readers = Lists.newArrayList(); + List columns = null; + + for (KuduSubScan.KuduSubScanSpec scanSpec : subScan.getTabletScanSpecList()) { + try { + if ((columns = subScan.getColumns())==null) { + columns = GroupScan.ALL_COLUMNS; + } + readers.add(new KuduRecordReader(subScan.getStorageEngine().getClient(), scanSpec, columns, context)); + } catch (Exception e1) { + throw new ExecutionSetupException(e1); + } + } + return new ScanBatch(subScan, context, readers.iterator()); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java new file mode 100644 index 0000000..b669f79 --- /dev/null +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kudu; + + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class KuduScanSpec { + + private final String tableName; + + @JsonCreator + public KuduScanSpec(@JsonProperty("tableName") String tableName) { + this.tableName = tableName; + } + + public String getTableName() { + return tableName; + } + + +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java new file mode 100644 index 0000000..294eabe --- /dev/null +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kudu; + +import java.io.IOException; +import java.util.Collections; +import java.util.Set; + +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.drill.exec.store.AbstractSchema; +import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.store.SchemaFactory; +import org.kududb.client.ListTablesResponse; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; + +public class KuduSchemaFactory implements SchemaFactory { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSchemaFactory.class); + + final String schemaName; + final KuduStoragePlugin plugin; + + public KuduSchemaFactory(KuduStoragePlugin plugin, String name) throws IOException { + this.plugin = plugin; + this.schemaName = name; + } + + @Override + public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { + KuduTables schema = new KuduTables(schemaName); + SchemaPlus hPlus = parent.add(schemaName, schema); + schema.setHolder(hPlus); + } + + class KuduTables extends AbstractSchema { + + public KuduTables(String name) { + super(ImmutableList.of(), name); + } + + public void setHolder(SchemaPlus plusOfThis) { + } + + @Override + public AbstractSchema getSubSchema(String name) { + return null; + } + + @Override + public Set getSubSchemaNames() { + return Collections.emptySet(); + } + + @Override + public Table getTable(String name) { + KuduScanSpec scanSpec = new KuduScanSpec(name); + return new DrillKuduTable(schemaName, plugin, scanSpec); + } + + @Override + public Set getTableNames() { + try { + ListTablesResponse tablesList = plugin.getClient().getTablesList(); + return Sets.newHashSet(tablesList.getTablesList()); + } catch (Exception e) { + logger.warn("Failure reading kudu tables.", e); + return Collections.EMPTY_SET; + } + } + + @Override + public String getTypeName() { + return KuduStoragePluginConfig.NAME; + } + + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java new file mode 100644 index 0000000..5e981b8 --- /dev/null +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kudu; + +import java.io.IOException; +import java.util.Collections; +import java.util.Set; + +import org.apache.calcite.schema.SchemaPlus; +import org.apache.drill.common.JSONOptions; +import org.apache.drill.exec.ops.OptimizerRulesContext; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.AbstractStoragePlugin; +import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.kududb.client.KuduClient; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class KuduStoragePlugin extends AbstractStoragePlugin { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduStoragePlugin.class); + + private final DrillbitContext context; + private final KuduStoragePluginConfig engineConfig; + private final KuduSchemaFactory schemaFactory; + + @SuppressWarnings("unused") + private final String name; + private final KuduClient client; + + public KuduStoragePlugin(KuduStoragePluginConfig configuration, DrillbitContext context, String name) + throws IOException { + this.context = context; + this.schemaFactory = new KuduSchemaFactory(this, name); + this.engineConfig = configuration; + this.name = name; + this.client = new KuduClient.KuduClientBuilder(configuration.getMasterAddresses()).build(); + } + + @Override + public void start() throws IOException { + + } + + public KuduClient getClient() { + return client; + } + + @Override + public void close() throws Exception { + client.close(); + } + + public DrillbitContext getContext() { + return this.context; + } + + @Override + public boolean supportsRead() { + return true; + } + + @Override + public KuduGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException { + KuduScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference() {}); + return new KuduGroupScan(this, scanSpec, null); + } + + @Override + public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { + schemaFactory.registerSchemas(schemaConfig, parent); + } + + @Override + public KuduStoragePluginConfig getConfig() { + return engineConfig; + } + + @Override + public Set getOptimizerRules(OptimizerRulesContext optimizerRulesContext) { + return Collections.EMPTY_SET; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java new file mode 100644 index 0000000..e07f967 --- /dev/null +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kudu; + +import org.apache.drill.common.logical.StoragePluginConfigBase; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName(KuduStoragePluginConfig.NAME) +public class KuduStoragePluginConfig extends StoragePluginConfigBase { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduStoragePluginConfig.class); + + public static final String NAME = "kudu"; + + private final String masterAddresses; + + @JsonCreator + public KuduStoragePluginConfig(@JsonProperty("masterAddresses") String masterAddresses) { + this.masterAddresses = masterAddresses; + } + + public String getMasterAddresses() { + return masterAddresses; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((masterAddresses == null) ? 0 : masterAddresses.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + KuduStoragePluginConfig other = (KuduStoragePluginConfig) obj; + if (masterAddresses == null) { + if (other.masterAddresses != null) { + return false; + } + } else if (!masterAddresses.equals(other.masterAddresses)) { + return false; + } + return true; + } + + + +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java new file mode 100644 index 0000000..267ee77 --- /dev/null +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kudu; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; + +// Class containing information for reading a single Kudu tablet +@JsonTypeName("kudu-tablet-scan") +public class KuduSubScan extends AbstractBase implements SubScan { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSubScan.class); + + @JsonProperty + public final KuduStoragePluginConfig storage; + + + private final KuduStoragePlugin kuduStoragePlugin; + private final List tabletScanSpecList; + private final List columns; + + @JsonCreator + public KuduSubScan(@JacksonInject StoragePluginRegistry registry, + @JsonProperty("storage") StoragePluginConfig storage, + @JsonProperty("tabletScanSpecList") LinkedList tabletScanSpecList, + @JsonProperty("columns") List columns) throws ExecutionSetupException { + super((String) null); + kuduStoragePlugin = (KuduStoragePlugin) registry.getPlugin(storage); + this.tabletScanSpecList = tabletScanSpecList; + this.storage = (KuduStoragePluginConfig) storage; + this.columns = columns; + } + + public KuduSubScan(KuduStoragePlugin plugin, KuduStoragePluginConfig config, + List tabletInfoList, List columns) { + super((String) null); + kuduStoragePlugin = plugin; + storage = config; + this.tabletScanSpecList = tabletInfoList; + this.columns = columns; + } + + public List getTabletScanSpecList() { + return tabletScanSpecList; + } + + @JsonIgnore + public KuduStoragePluginConfig getStorageConfig() { + return storage; + } + + public List getColumns() { + return columns; + } + + @Override + public boolean isExecutable() { + return false; + } + + @JsonIgnore + public KuduStoragePlugin getStorageEngine(){ + return kuduStoragePlugin; + } + + @Override + public T accept(PhysicalVisitor physicalVisitor, X value) throws E { + return physicalVisitor.visitSubScan(this, value); + } + + @Override + public PhysicalOperator getNewWithChildren(List children) { + Preconditions.checkArgument(children.isEmpty()); + return new KuduSubScan(kuduStoragePlugin, storage, tabletScanSpecList, columns); + } + + @Override + public Iterator iterator() { + return Iterators.emptyIterator(); + } + + public static class KuduSubScanSpec { + + private final String tableName; + + @JsonCreator + public KuduSubScanSpec(@JsonProperty("tableName") String tableName) { + this.tableName = tableName; + } + + public String getTableName() { + return tableName; + } + + } + + @Override + public int getOperatorType() { + return CoreOperatorType.HBASE_SUB_SCAN_VALUE; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json new file mode 100644 index 0000000..3ba12c0 --- /dev/null +++ b/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json @@ -0,0 +1,9 @@ +{ + "storage":{ + kudu : { + type:"kudu", + masterAddresses: "172.31.1.99", + enabled: true + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/main/resources/drill-module.conf b/contrib/storage-kudu/src/main/resources/drill-module.conf new file mode 100644 index 0000000..e225600 --- /dev/null +++ b/contrib/storage-kudu/src/main/resources/drill-module.conf @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// This file tells Drill to consider this module when class path scanning. +// This file can also include any supplementary configuration information. +// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. + +drill: { + classpath.scanning: { + packages += "org.apache.drill.exec.store.kudu" + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java new file mode 100644 index 0000000..a6fc69c --- /dev/null +++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java @@ -0,0 +1,76 @@ +package org.apache.drill.store.kudu; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; +import org.kududb.ColumnSchema; +import org.kududb.Schema; +import org.kududb.Type; +import org.kududb.client.Insert; +import org.kududb.client.KuduClient; +import org.kududb.client.KuduScanner; +import org.kududb.client.KuduSession; +import org.kududb.client.KuduTable; +import org.kududb.client.ListTablesResponse; +import org.kududb.client.PartialRow; +import org.kududb.client.RowResult; +import org.kududb.client.RowResultIterator; + + +public class TestKuduConnect { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestKuduConnect.class); + + public static final String KUDU_MASTER = "172.31.1.99"; + public static final String KUDU_TABLE = "demo"; + + @Test + public void abc() throws Exception { + + try (KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build()) { + + ListTablesResponse tables = client.getTablesList(KUDU_TABLE); + if (!tables.getTablesList().isEmpty()) { + client.deleteTable(KUDU_TABLE); + } + ; + + List columns = new ArrayList<>(5); + columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("binary", Type.BINARY).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("boolean", Type.BOOL).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("float", Type.FLOAT).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).build()); + + Schema schema = new Schema(columns); + client.createTable(KUDU_TABLE, schema); + + KuduTable table = client.openTable(KUDU_TABLE); + + KuduSession session = client.newSession(); + for (int i = 0; i < 3; i++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addInt(0, i); + row.addBinary(1, ("Row " + i).getBytes()); + row.addBoolean(2, i % 2 == 0); + row.addFloat(3, i + 0.01f); + row.addString(4, ("Row " + i)); + session.apply(insert); + } + + List projectColumns = new ArrayList<>(1); + projectColumns.add("float"); + KuduScanner scanner = client.newScannerBuilder(table) + .setProjectedColumnNames(projectColumns) + .build(); + while (scanner.hasMoreRows()) { + RowResultIterator results = scanner.nextRows(); + while (results.hasNext()) { + RowResult result = results.next(); + System.out.println(result.toStringLongFormat()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java new file mode 100644 index 0000000..1a6211f --- /dev/null +++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java @@ -0,0 +1,12 @@ +package org.apache.drill.store.kudu; + +import org.apache.drill.BaseTestQuery; +import org.junit.Test; + +public class TestKuduPlugin extends BaseTestQuery { + + @Test + public void testBasicQuery() throws Exception { + test("select * from kudu.demo;"); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduTableProvider.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduTableProvider.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduTableProvider.java new file mode 100644 index 0000000..ca8f9e1 --- /dev/null +++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduTableProvider.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.store.kudu; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.drill.common.config.LogicalPlanPersistence; +import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory; +import org.apache.drill.exec.store.kudu.config.KuduPStoreProvider; +import org.apache.drill.exec.store.sys.PStore; +import org.apache.drill.exec.store.sys.PStoreConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestKuduTableProvider extends BaseKuduTest { + + private static KuduPStoreProvider provider; + + @BeforeClass // mask Kudu cluster start function + public static void setUpBeforeTestKuduTableProvider() throws Exception { + provider = new KuduPStoreProvider(storagePluginConfig.getKuduConf(), "drill_store"); + provider.start(); + } + + @Test + public void testTableProvider() throws IOException { + LogicalPlanPersistence lp = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(config); + PStore kuduStore = provider.getStore(PStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("kudu").build()); + kuduStore.put("", "v0"); + kuduStore.put("k1", "v1"); + kuduStore.put("k2", "v2"); + kuduStore.put("k3", "v3"); + kuduStore.put("k4", "v4"); + kuduStore.put("k5", "v5"); + kuduStore.put(".test", "testValue"); + + assertEquals("v0", kuduStore.get("")); + assertEquals("testValue", kuduStore.get(".test")); + + int rowCount = 0; + for (Entry entry : kuduStore) { + rowCount++; + System.out.println(entry.getKey() + "=" + entry.getValue()); + } + assertEquals(7, rowCount); + + PStore kuduTestStore = provider.getStore(PStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("kudu.test").build()); + kuduTestStore.put("", "v0"); + kuduTestStore.put("k1", "v1"); + kuduTestStore.put("k2", "v2"); + kuduTestStore.put("k3", "v3"); + kuduTestStore.put("k4", "v4"); + kuduTestStore.put(".test", "testValue"); + + assertEquals("v0", kuduStore.get("")); + assertEquals("testValue", kuduStore.get(".test")); + + rowCount = 0; + for (Entry entry : kuduTestStore) { + rowCount++; + System.out.println(entry.getKey() + "=" + entry.getValue()); + } + assertEquals(6, rowCount); + } + + @AfterClass + public static void tearDownTestKuduTableProvider() { + if (provider != null) { + provider.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/test/resources/logback.xml ---------------------------------------------------------------------- diff --git a/contrib/storage-kudu/src/test/resources/logback.xml b/contrib/storage-kudu/src/test/resources/logback.xml new file mode 100644 index 0000000..6ef172b --- /dev/null +++ b/contrib/storage-kudu/src/test/resources/logback.xml @@ -0,0 +1,64 @@ + + + + + + + true + 10000 + true + ${LILITH_HOSTNAME:-localhost} + + + + + ${logback.log.dir:-./target/surefire-reports}/hbase-tests-${bySecond}.log + false + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + + + + + + + + + +