drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [01/12] drill git commit: DRILL-4241: initial commit
Date Mon, 11 Jan 2016 07:53:49 GMT
Repository: drill
Updated Branches:
  refs/heads/master f964908ae -> 392d1f7e9


DRILL-4241: initial commit


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5dfb4512
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5dfb4512
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5dfb4512

Branch: refs/heads/master
Commit: 5dfb451222b4259e274304bb360e2b000bcd26af
Parents: f964908
Author: Jacques Nadeau <jacques.drill@gmail.com>
Authored: Wed Nov 18 18:45:56 2015 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sun Jan 10 22:46:16 2016 -0800

----------------------------------------------------------------------
 contrib/storage-kudu/.gitignore                 |  15 ++
 contrib/storage-kudu/README.md                  |   1 +
 contrib/storage-kudu/pom.xml                    |  87 ++++++++
 .../drill/exec/store/kudu/DrillKuduTable.java   |  36 ++++
 .../drill/exec/store/kudu/KuduGroupScan.java    | 203 +++++++++++++++++++
 .../drill/exec/store/kudu/KuduRecordReader.java | 178 ++++++++++++++++
 .../exec/store/kudu/KuduScanBatchCreator.java   |  57 ++++++
 .../drill/exec/store/kudu/KuduScanSpec.java     |  38 ++++
 .../exec/store/kudu/KuduSchemaFactory.java      |  95 +++++++++
 .../exec/store/kudu/KuduStoragePlugin.java      |  99 +++++++++
 .../store/kudu/KuduStoragePluginConfig.java     |  75 +++++++
 .../drill/exec/store/kudu/KuduSubScan.java      | 135 ++++++++++++
 .../resources/bootstrap-storage-plugins.json    |   9 +
 .../src/main/resources/drill-module.conf        |  24 +++
 .../drill/store/kudu/TestKuduConnect.java       |  76 +++++++
 .../apache/drill/store/kudu/TestKuduPlugin.java |  12 ++
 .../drill/store/kudu/TestKuduTableProvider.java |  91 +++++++++
 .../storage-kudu/src/test/resources/logback.xml |  64 ++++++
 18 files changed, 1295 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/.gitignore
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/.gitignore b/contrib/storage-kudu/.gitignore
new file mode 100644
index 0000000..f290bae
--- /dev/null
+++ b/contrib/storage-kudu/.gitignore
@@ -0,0 +1,15 @@
+.project
+.buildpath
+.classpath
+.checkstyle
+.settings/
+.idea/
+TAGS
+*.log
+*.lck
+*.iml
+target/
+*.DS_Store
+*.patch
+*~
+git.properties

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/README.md
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/README.md b/contrib/storage-kudu/README.md
new file mode 100644
index 0000000..f4597d4
--- /dev/null
+++ b/contrib/storage-kudu/README.md
@@ -0,0 +1 @@
+# drill-storage-kudu

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/pom.xml b/contrib/storage-kudu/pom.xml
new file mode 100644
index 0000000..7e57ca8
--- /dev/null
+++ b/contrib/storage-kudu/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+  license agreements. See the NOTICE file distributed with this work for additional 
+  information regarding copyright ownership. The ASF licenses this file to 
+  You under the Apache License, Version 2.0 (the "License"); you may not use 
+  this file except in compliance with the License. You may obtain a copy of 
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+  by applicable law or agreed to in writing, software distributed under the 
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+  OF ANY KIND, either express or implied. See the License for the specific 
+  language governing permissions and limitations under the License. -->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.3.0</version>
+  </parent>
+
+  <artifactId>drill-storage-kudu</artifactId>
+  <version>1.3.0-SNAPSHOT</version>
+
+  <name>contrib/kudu-storage-plugin</name>
+
+  <properties>
+    <drill.version>1.3.0</drill.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${drill.version}</version>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${drill.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${drill.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kududb</groupId>
+      <artifactId>kudu-client</artifactId>
+      <version>0.5.0</version>
+    </dependency>
+
+  </dependencies>
+
+  <repositories>
+    <repository>
+      <id>drill-1016</id>
+      <url>https://repository.apache.org/content/repositories/orgapachedrill-1016/</url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+    <repository>
+      <id>cdh.repo</id>
+      <name>Cloudera Repositories</name>
+      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java
new file mode 100644
index 0000000..3b9c757
--- /dev/null
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kudu;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+
+public class DrillKuduTable extends DynamicDrillTable {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillKuduTable.class);
+
+  public DrillKuduTable(String storageEngineName, KuduStoragePlugin plugin, KuduScanSpec scanSpec) {
+    super(plugin, storageEngineName, scanSpec);
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    return super.getRowType(typeFactory);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
new file mode 100644
index 0000000..bc543d9
--- /dev/null
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kudu;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kudu.KuduSubScan.KuduSubScanSpec;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+@JsonTypeName("kudu-scan")
+public class KuduGroupScan extends AbstractGroupScan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduGroupScan.class);
+
+  private KuduStoragePluginConfig storagePluginConfig;
+  private List<SchemaPath> columns;
+  private KuduScanSpec kuduScanSpec;
+  private KuduStoragePlugin storagePlugin;
+  private boolean filterPushedDown = false;
+
+
+  @JsonCreator
+  public KuduGroupScan(@JsonProperty("kuduScanSpec") KuduScanSpec kuduScanSpec,
+                        @JsonProperty("storage") KuduStoragePluginConfig storagePluginConfig,
+                        @JsonProperty("columns") List<SchemaPath> columns,
+                        @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
+    this((KuduStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), kuduScanSpec, columns);
+  }
+
+  public KuduGroupScan(KuduStoragePlugin storagePlugin, KuduScanSpec scanSpec,
+      List<SchemaPath> columns) {
+    super((String) null);
+    this.storagePlugin = storagePlugin;
+    this.storagePluginConfig = storagePlugin.getConfig();
+    this.kuduScanSpec = scanSpec;
+    this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   * @param that The KuduGroupScan to clone
+   */
+  private KuduGroupScan(KuduGroupScan that) {
+    super(that);
+    this.columns = that.columns;
+    this.kuduScanSpec = that.kuduScanSpec;
+    this.storagePlugin = that.storagePlugin;
+    this.storagePluginConfig = that.storagePluginConfig;
+    this.filterPushedDown = that.filterPushedDown;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    KuduGroupScan newScan = new KuduGroupScan(this);
+    newScan.columns = columns;
+    return newScan;
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return Collections.EMPTY_LIST;
+  }
+
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return 1;
+  }
+
+
+  /**
+   *
+   * @param incomingEndpoints
+   */
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+  }
+
+
+  @Override
+  public KuduSubScan getSpecificScan(int minorFragmentId) {
+    return new KuduSubScan(storagePlugin, storagePluginConfig,
+        ImmutableList.of(new KuduSubScanSpec(kuduScanSpec.getTableName())),
+        this.columns);
+  }
+
+  // KuduStoragePlugin plugin, KuduStoragePluginConfig config,
+  // List<KuduSubScanSpec> tabletInfoList, List<SchemaPath> columns
+  @Override
+  public ScanStats getScanStats() {
+    return ScanStats.TRIVIAL_TABLE;
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new KuduGroupScan(this);
+  }
+
+  @JsonIgnore
+  public KuduStoragePlugin getStoragePlugin() {
+    return storagePlugin;
+  }
+
+  @JsonIgnore
+  public String getTableName() {
+    return getKuduScanSpec().getTableName();
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public String toString() {
+    return "KuduGroupScan [KuduScanSpec="
+        + kuduScanSpec + ", columns="
+        + columns + "]";
+  }
+
+  @JsonProperty("storage")
+  public KuduStoragePluginConfig getStorageConfig() {
+    return this.storagePluginConfig;
+  }
+
+  @JsonProperty
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonProperty
+  public KuduScanSpec getKuduScanSpec() {
+    return kuduScanSpec;
+  }
+
+  @Override
+  @JsonIgnore
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  @JsonIgnore
+  public void setFilterPushedDown(boolean b) {
+    this.filterPushedDown = true;
+  }
+
+  @JsonIgnore
+  public boolean isFilterPushedDown() {
+    return filterPushedDown;
+  }
+
+  /**
+   * Empty constructor, do not use, only for testing.
+   */
+  @VisibleForTesting
+  public KuduGroupScan() {
+    super((String)null);
+  }
+
+  /**
+   * Do not use, only for testing.
+   */
+  @VisibleForTesting
+  public void setKuduScanSpec(KuduScanSpec kuduScanSpec) {
+    this.kuduScanSpec = kuduScanSpec;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
new file mode 100644
index 0000000..0200527
--- /dev/null
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kudu;
+
+import io.netty.buffer.DrillBuf;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kudu.KuduSubScan.KuduSubScanSpec;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.kududb.ColumnSchema;
+import org.kududb.Type;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduScanner;
+import org.kududb.client.KuduTable;
+import org.kududb.client.RowResult;
+import org.kududb.client.RowResultIterator;
+import org.kududb.client.shaded.com.google.common.collect.ImmutableMap;
+
+public class KuduRecordReader extends AbstractRecordReader {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduRecordReader.class);
+
+  private static final int TARGET_RECORD_COUNT = 4000;
+
+  private final KuduClient client;
+  private final KuduSubScanSpec scanSpec;
+  private KuduTable table;
+  private VectorContainerWriter containerWriter;
+  private MapWriter writer;
+  private KuduScanner scanner;
+  private RowResultIterator iterator;
+  private DrillBuf buffer;
+
+  public KuduRecordReader(KuduClient client, KuduSubScan.KuduSubScanSpec subScanSpec,
+      List<SchemaPath> projectedColumns, FragmentContext context) {
+    setColumns(projectedColumns);
+    this.client = client;
+    buffer = context.getManagedBuffer();
+    scanSpec = subScanSpec;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    try {
+      KuduTable table = client.openTable(scanSpec.getTableName());
+      scanner = client.newScannerBuilder(table).build();
+      containerWriter = new VectorContainerWriter(output);
+      writer = containerWriter.rootAsMap();
+    } catch (Exception e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  static final Map<Type, MajorType> TYPES;
+
+  static {
+    TYPES = ImmutableMap.<Type, MajorType> builder()
+        .put(Type.BINARY, Types.optional(MinorType.VARBINARY))
+        .put(Type.BOOL, Types.optional(MinorType.BIT))
+        .put(Type.DOUBLE, Types.optional(MinorType.FLOAT8))
+        .put(Type.FLOAT, Types.optional(MinorType.FLOAT4))
+        .put(Type.INT16, Types.optional(MinorType.INT))
+        .put(Type.INT32, Types.optional(MinorType.INT))
+        .put(Type.INT8, Types.optional(MinorType.INT))
+        .put(Type.INT64, Types.optional(MinorType.BIGINT))
+        .put(Type.STRING, Types.optional(MinorType.VARCHAR))
+        .put(Type.TIMESTAMP, Types.optional(MinorType.TIMESTAMP))
+        .build();
+  }
+
+  @Override
+  public int next() {
+    int rowCount = 0;
+    try {
+      while (iterator == null || !iterator.hasNext()) {
+        if (!scanner.hasMoreRows()) {
+          iterator = null;
+          break;
+        }
+        iterator = scanner.nextRows();
+
+        for (; rowCount < 4095 && iterator.hasNext(); rowCount++) {
+          writer.setPosition(rowCount);
+          RowResult result = iterator.next();
+          int i = 0;
+          for (ColumnSchema column : result.getColumnProjection().getColumns()) {
+            switch (column.getType()) {
+            case STRING: {
+              final ByteBuffer buf = result.getBinary(i);
+              final int length = buf.remaining();
+              ensure(length);
+              buffer.setBytes(0, buf);
+              writer.varChar(column.getName()).writeVarChar(0, length, buffer);
+              break;
+            }
+            case BINARY: {
+              final ByteBuffer buf = result.getBinary(i);
+              final int length = buf.remaining();
+              ensure(length);
+              buffer.setBytes(0, buf);
+              writer.varBinary(column.getName()).writeVarBinary(0, length, buffer);
+              break;
+            }
+            case INT8:
+              writer.integer(column.getName()).writeInt(result.getByte(i));
+              break;
+            case INT16:
+              writer.integer(column.getName()).writeInt(result.getShort(i));
+              break;
+            case INT32:
+              writer.integer(column.getName()).writeInt(result.getInt(i));
+              break;
+            case INT64:
+              writer.bigInt(column.getName()).writeBigInt(result.getLong(i));
+              break;
+            case FLOAT:
+              writer.float4(column.getName()).writeFloat4(result.getFloat(i));
+              break;
+            case DOUBLE:
+              writer.float8(column.getName()).writeFloat8(result.getDouble(i));
+              break;
+            case BOOL:
+              writer.bit(column.getName()).writeBit(result.getBoolean(i) ? 1 : 0);
+              break;
+            case TIMESTAMP:
+              writer.timeStamp(column.getName()).writeTimeStamp(result.getLong(i) / 1000);
+              break;
+            default:
+              throw new UnsupportedOperationException("unsupported type " + column.getType());
+            }
+
+            i++;
+          }
+        }
+      }
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    containerWriter.setValueCount(rowCount);
+    return rowCount;
+  }
+
+  private void ensure(final int length) {
+    buffer = buffer.reallocIfNeeded(length);
+  }
+
+  @Override
+  public void close() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
new file mode 100644
index 0000000..b3c2c4e
--- /dev/null
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kudu;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class KuduScanBatchCreator implements BatchCreator<KuduSubScan>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduScanBatchCreator.class);
+
+  @Override
+  public ScanBatch getBatch(FragmentContext context, KuduSubScan subScan, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    List<RecordReader> readers = Lists.newArrayList();
+    List<SchemaPath> columns = null;
+
+    for (KuduSubScan.KuduSubScanSpec scanSpec : subScan.getTabletScanSpecList()) {
+      try {
+        if ((columns = subScan.getColumns())==null) {
+          columns = GroupScan.ALL_COLUMNS;
+        }
+        readers.add(new KuduRecordReader(subScan.getStorageEngine().getClient(), scanSpec, columns, context));
+      } catch (Exception e1) {
+        throw new ExecutionSetupException(e1);
+      }
+    }
+    return new ScanBatch(subScan, context, readers.iterator());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java
new file mode 100644
index 0000000..b669f79
--- /dev/null
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanSpec.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kudu;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class KuduScanSpec {
+
+  private final String tableName;
+
+  @JsonCreator
+  public KuduScanSpec(@JsonProperty("tableName") String tableName) {
+    this.tableName = tableName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
new file mode 100644
index 0000000..294eabe
--- /dev/null
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kudu;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.kududb.client.ListTablesResponse;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
+public class KuduSchemaFactory implements SchemaFactory {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSchemaFactory.class);
+
+  final String schemaName;
+  final KuduStoragePlugin plugin;
+
+  public KuduSchemaFactory(KuduStoragePlugin plugin, String name) throws IOException {
+    this.plugin = plugin;
+    this.schemaName = name;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    KuduTables schema = new KuduTables(schemaName);
+    SchemaPlus hPlus = parent.add(schemaName, schema);
+    schema.setHolder(hPlus);
+  }
+
+  class KuduTables extends AbstractSchema {
+
+    public KuduTables(String name) {
+      super(ImmutableList.<String>of(), name);
+    }
+
+    public void setHolder(SchemaPlus plusOfThis) {
+    }
+
+    @Override
+    public AbstractSchema getSubSchema(String name) {
+      return null;
+    }
+
+    @Override
+    public Set<String> getSubSchemaNames() {
+      return Collections.emptySet();
+    }
+
+    @Override
+    public Table getTable(String name) {
+      KuduScanSpec scanSpec = new KuduScanSpec(name);
+      return new DrillKuduTable(schemaName, plugin, scanSpec);
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      try {
+        ListTablesResponse tablesList = plugin.getClient().getTablesList();
+        return Sets.newHashSet(tablesList.getTablesList());
+      } catch (Exception e) {
+        logger.warn("Failure reading kudu tables.", e);
+        return Collections.EMPTY_SET;
+      }
+    }
+
+    @Override
+    public String getTypeName() {
+      return KuduStoragePluginConfig.NAME;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
new file mode 100644
index 0000000..5e981b8
--- /dev/null
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kudu;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.kududb.client.KuduClient;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class KuduStoragePlugin extends AbstractStoragePlugin {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduStoragePlugin.class);
+
+  private final DrillbitContext context;
+  private final KuduStoragePluginConfig engineConfig;
+  private final KuduSchemaFactory schemaFactory;
+
+  @SuppressWarnings("unused")
+  private final String name;
+  private final KuduClient client;
+
+  public KuduStoragePlugin(KuduStoragePluginConfig configuration, DrillbitContext context, String name)
+      throws IOException {
+    this.context = context;
+    this.schemaFactory = new KuduSchemaFactory(this, name);
+    this.engineConfig = configuration;
+    this.name = name;
+    this.client = new KuduClient.KuduClientBuilder(configuration.getMasterAddresses()).build();
+  }
+
+  @Override
+  public void start() throws IOException {
+
+  }
+
+  public KuduClient getClient() {
+    return client;
+  }
+
+  @Override
+  public void close() throws Exception {
+    client.close();
+  }
+
+  public DrillbitContext getContext() {
+    return this.context;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public KuduGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    KuduScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<KuduScanSpec>() {});
+    return new KuduGroupScan(this, scanSpec, null);
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  @Override
+  public KuduStoragePluginConfig getConfig() {
+    return engineConfig;
+  }
+
+  @Override
+  public Set<StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
+    return Collections.EMPTY_SET;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java
new file mode 100644
index 0000000..e07f967
--- /dev/null
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePluginConfig.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kudu;
+
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName(KuduStoragePluginConfig.NAME)
+public class KuduStoragePluginConfig extends StoragePluginConfigBase {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduStoragePluginConfig.class);
+
+  public static final String NAME = "kudu";
+
+  private final String masterAddresses;
+
+  @JsonCreator
+  public KuduStoragePluginConfig(@JsonProperty("masterAddresses") String masterAddresses) {
+    this.masterAddresses = masterAddresses;
+  }
+
+  public String getMasterAddresses() {
+    return masterAddresses;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((masterAddresses == null) ? 0 : masterAddresses.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    KuduStoragePluginConfig other = (KuduStoragePluginConfig) obj;
+    if (masterAddresses == null) {
+      if (other.masterAddresses != null) {
+        return false;
+      }
+    } else if (!masterAddresses.equals(other.masterAddresses)) {
+      return false;
+    }
+    return true;
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
new file mode 100644
index 0000000..267ee77
--- /dev/null
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kudu;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+// Class containing information for reading a single Kudu tablet
+@JsonTypeName("kudu-tablet-scan")
+public class KuduSubScan extends AbstractBase implements SubScan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSubScan.class);
+
+  @JsonProperty
+  public final KuduStoragePluginConfig storage;
+
+
+  private final KuduStoragePlugin kuduStoragePlugin;
+  private final List<KuduSubScanSpec> tabletScanSpecList;
+  private final List<SchemaPath> columns;
+
+  @JsonCreator
+  public KuduSubScan(@JacksonInject StoragePluginRegistry registry,
+                      @JsonProperty("storage") StoragePluginConfig storage,
+      @JsonProperty("tabletScanSpecList") LinkedList<KuduSubScanSpec> tabletScanSpecList,
+                      @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
+    super((String) null);
+    kuduStoragePlugin = (KuduStoragePlugin) registry.getPlugin(storage);
+    this.tabletScanSpecList = tabletScanSpecList;
+    this.storage = (KuduStoragePluginConfig) storage;
+    this.columns = columns;
+  }
+
+  public KuduSubScan(KuduStoragePlugin plugin, KuduStoragePluginConfig config,
+      List<KuduSubScanSpec> tabletInfoList, List<SchemaPath> columns) {
+    super((String) null);
+    kuduStoragePlugin = plugin;
+    storage = config;
+    this.tabletScanSpecList = tabletInfoList;
+    this.columns = columns;
+  }
+
+  public List<KuduSubScanSpec> getTabletScanSpecList() {
+    return tabletScanSpecList;
+  }
+
+  @JsonIgnore
+  public KuduStoragePluginConfig getStorageConfig() {
+    return storage;
+  }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @Override
+  public boolean isExecutable() {
+    return false;
+  }
+
+  @JsonIgnore
+  public KuduStoragePlugin getStorageEngine(){
+    return kuduStoragePlugin;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new KuduSubScan(kuduStoragePlugin, storage, tabletScanSpecList, columns);
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.emptyIterator();
+  }
+
+  public static class KuduSubScanSpec {
+
+    private final String tableName;
+
+    @JsonCreator
+    public KuduSubScanSpec(@JsonProperty("tableName") String tableName) {
+      this.tableName = tableName;
+    }
+
+    public String getTableName() {
+      return tableName;
+    }
+
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.HBASE_SUB_SCAN_VALUE;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..3ba12c0
--- /dev/null
+++ b/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,9 @@
+{
+  "storage":{
+    kudu : {
+      type:"kudu",
+      masterAddresses: "172.31.1.99",
+      enabled: true
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/resources/drill-module.conf b/contrib/storage-kudu/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..e225600
--- /dev/null
+++ b/contrib/storage-kudu/src/main/resources/drill-module.conf
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//  This file tells Drill to consider this module when class path scanning.
+//  This file can also include any supplementary configuration information.
+//  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill: {
+  classpath.scanning: {
+    packages += "org.apache.drill.exec.store.kudu"
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java
new file mode 100644
index 0000000..a6fc69c
--- /dev/null
+++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java
@@ -0,0 +1,76 @@
+package org.apache.drill.store.kudu;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.kududb.ColumnSchema;
+import org.kududb.Schema;
+import org.kududb.Type;
+import org.kududb.client.Insert;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduScanner;
+import org.kududb.client.KuduSession;
+import org.kududb.client.KuduTable;
+import org.kududb.client.ListTablesResponse;
+import org.kududb.client.PartialRow;
+import org.kududb.client.RowResult;
+import org.kududb.client.RowResultIterator;
+
+
+public class TestKuduConnect {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestKuduConnect.class);
+
+  public static final String KUDU_MASTER = "172.31.1.99";
+  public static final String KUDU_TABLE = "demo";
+
+  @Test
+  public void abc() throws Exception {
+
+    try (KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build()) {
+
+      ListTablesResponse tables = client.getTablesList(KUDU_TABLE);
+      if (!tables.getTablesList().isEmpty()) {
+        client.deleteTable(KUDU_TABLE);
+      }
+      ;
+
+      List<ColumnSchema> columns = new ArrayList<>(5);
+      columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+      columns.add(new ColumnSchema.ColumnSchemaBuilder("binary", Type.BINARY).build());
+      columns.add(new ColumnSchema.ColumnSchemaBuilder("boolean", Type.BOOL).build());
+      columns.add(new ColumnSchema.ColumnSchemaBuilder("float", Type.FLOAT).build());
+      columns.add(new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).build());
+
+      Schema schema = new Schema(columns);
+      client.createTable(KUDU_TABLE, schema);
+
+      KuduTable table = client.openTable(KUDU_TABLE);
+
+      KuduSession session = client.newSession();
+      for (int i = 0; i < 3; i++) {
+        Insert insert = table.newInsert();
+        PartialRow row = insert.getRow();
+        row.addInt(0, i);
+        row.addBinary(1, ("Row " + i).getBytes());
+        row.addBoolean(2, i % 2 == 0);
+        row.addFloat(3, i + 0.01f);
+        row.addString(4, ("Row " + i));
+        session.apply(insert);
+      }
+
+      List<String> projectColumns = new ArrayList<>(1);
+      projectColumns.add("float");
+      KuduScanner scanner = client.newScannerBuilder(table)
+          .setProjectedColumnNames(projectColumns)
+          .build();
+      while (scanner.hasMoreRows()) {
+        RowResultIterator results = scanner.nextRows();
+        while (results.hasNext()) {
+          RowResult result = results.next();
+          System.out.println(result.toStringLongFormat());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java
new file mode 100644
index 0000000..1a6211f
--- /dev/null
+++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java
@@ -0,0 +1,12 @@
+package org.apache.drill.store.kudu;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestKuduPlugin extends BaseTestQuery {
+
+  @Test
+  public void testBasicQuery() throws Exception {
+    test("select * from kudu.demo;");
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduTableProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduTableProvider.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduTableProvider.java
new file mode 100644
index 0000000..ca8f9e1
--- /dev/null
+++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduTableProvider.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.store.kudu;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.config.LogicalPlanPersistence;
+import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
+import org.apache.drill.exec.store.kudu.config.KuduPStoreProvider;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestKuduTableProvider extends BaseKuduTest {
+
+  private static KuduPStoreProvider provider;
+
+  @BeforeClass // mask Kudu cluster start function
+  public static void setUpBeforeTestKuduTableProvider() throws Exception {
+    provider = new KuduPStoreProvider(storagePluginConfig.getKuduConf(), "drill_store");
+    provider.start();
+  }
+
+  @Test
+  public void testTableProvider() throws IOException {
+    LogicalPlanPersistence lp = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(config);
+    PStore<String> kuduStore = provider.getStore(PStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("kudu").build());
+    kuduStore.put("", "v0");
+    kuduStore.put("k1", "v1");
+    kuduStore.put("k2", "v2");
+    kuduStore.put("k3", "v3");
+    kuduStore.put("k4", "v4");
+    kuduStore.put("k5", "v5");
+    kuduStore.put(".test", "testValue");
+
+    assertEquals("v0", kuduStore.get(""));
+    assertEquals("testValue", kuduStore.get(".test"));
+
+    int rowCount = 0;
+    for (Entry<String, String> entry : kuduStore) {
+      rowCount++;
+      System.out.println(entry.getKey() + "=" + entry.getValue());
+    }
+    assertEquals(7, rowCount);
+
+    PStore<String> kuduTestStore = provider.getStore(PStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("kudu.test").build());
+    kuduTestStore.put("", "v0");
+    kuduTestStore.put("k1", "v1");
+    kuduTestStore.put("k2", "v2");
+    kuduTestStore.put("k3", "v3");
+    kuduTestStore.put("k4", "v4");
+    kuduTestStore.put(".test", "testValue");
+
+    assertEquals("v0", kuduStore.get(""));
+    assertEquals("testValue", kuduStore.get(".test"));
+
+    rowCount = 0;
+    for (Entry<String, String> entry : kuduTestStore) {
+      rowCount++;
+      System.out.println(entry.getKey() + "=" + entry.getValue());
+    }
+    assertEquals(6, rowCount);
+  }
+
+  @AfterClass
+  public static void tearDownTestKuduTableProvider() {
+    if (provider != null) {
+      provider.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5dfb4512/contrib/storage-kudu/src/test/resources/logback.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/test/resources/logback.xml b/contrib/storage-kudu/src/test/resources/logback.xml
new file mode 100644
index 0000000..6ef172b
--- /dev/null
+++ b/contrib/storage-kudu/src/test/resources/logback.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <timestamp key="bySecond" datePattern="yyyyMMdd'T'HHmmss"/>
+
+  <appender name="SOCKET"
+    class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
+    <Compressing>true</Compressing>
+    <ReconnectionDelay>10000</ReconnectionDelay>
+    <IncludeCallerData>true</IncludeCallerData>
+    <RemoteHosts>${LILITH_HOSTNAME:-localhost}</RemoteHosts>
+  </appender>
+
+  <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+    <!-- The property 'logback.log.dir' is defined in pom.xml -->
+    <file>${logback.log.dir:-./target/surefire-reports}/hbase-tests-${bySecond}.log</file>
+    <append>false</append>
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+   </appender>
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <logger name="org.apache.drill" additivity="false">
+    <level value="info" />
+    <appender-ref ref="FILE" />
+  </logger>
+
+  <logger name="org.apache.drill" additivity="false">
+    <level value="debug" />
+    <appender-ref ref="SOCKET" />
+  </logger>
+
+  <logger name="org.apache.hadoop" additivity="false">
+    <level value="info" />
+    <appender-ref ref="FILE" />
+  </logger>
+
+  <root>
+    <level value="error" />
+    <appender-ref ref="STDOUT" />
+  </root>
+
+</configuration>


Mime
View raw message