Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 22B2F200B8C for ; Mon, 12 Sep 2016 22:24:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2119C160AB2; Mon, 12 Sep 2016 20:24:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3976B160AD6 for ; Mon, 12 Sep 2016 22:24:36 +0200 (CEST) Received: (qmail 55402 invoked by uid 500); 12 Sep 2016 20:24:33 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 54907 invoked by uid 99); 12 Sep 2016 20:24:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Sep 2016 20:24:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 71972E0667; Mon, 12 Sep 2016 20:24:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sershe@apache.org To: commits@hive.apache.org Date: Mon, 12 Sep 2016 20:24:41 -0000 Message-Id: <67ee343e49df44c1be9290ff934284f1@git.apache.org> In-Reply-To: <90ec12d01f104e0a903e31308e28d902@git.apache.org> References: <90ec12d01f104e0a903e31308e28d902@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/31] hive git commit: HIVE-14217: Druid integration (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan) archived-at: Mon, 12 Sep 2016 20:24:39 -0000 HIVE-14217: Druid integration (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan) Close apache/hive#98 Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/58d1befa Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/58d1befa Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/58d1befa Branch: refs/heads/hive-14535 Commit: 58d1befa2131254b53122b3573189ac1c5022217 Parents: 63fdb51 Author: Jesus Camacho Rodriguez Authored: Fri Aug 12 12:55:46 2016 +0100 Committer: Jesus Camacho Rodriguez Committed: Thu Sep 8 09:42:26 2016 +0100 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/Constants.java | 10 +- .../org/apache/hadoop/hive/conf/HiveConf.java | 11 + druid-handler/pom.xml | 201 ++++ .../hadoop/hive/druid/DruidStorageHandler.java | 109 ++ .../hive/druid/DruidStorageHandlerUtils.java | 90 ++ .../hive/druid/HiveDruidOutputFormat.java | 55 + .../druid/HiveDruidQueryBasedInputFormat.java | 369 ++++++ .../hadoop/hive/druid/HiveDruidSplit.java | 83 ++ .../serde/DruidGroupByQueryRecordReader.java | 199 ++++ .../druid/serde/DruidQueryRecordReader.java | 142 +++ .../serde/DruidSelectQueryRecordReader.java | 106 ++ .../hadoop/hive/druid/serde/DruidSerDe.java | 343 ++++++ .../hive/druid/serde/DruidSerDeUtils.java | 83 ++ .../serde/DruidTimeseriesQueryRecordReader.java | 93 ++ .../druid/serde/DruidTopNQueryRecordReader.java | 106 ++ .../hadoop/hive/druid/serde/DruidWritable.java | 81 ++ .../hadoop/hive/druid/QTestDruidSerDe.java | 88 ++ .../hive/druid/QTestDruidStorageHandler.java | 34 + .../hadoop/hive/druid/TestDruidSerDe.java | 576 ++++++++++ .../TestHiveDruidQueryBasedInputFormat.java | 101 ++ itests/qtest/pom.xml | 13 + packaging/pom.xml | 5 + pom.xml | 2 + .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 8 +- .../hadoop/hive/ql/exec/FunctionRegistry.java | 22 +- .../optimizer/calcite/HivePlannerContext.java | 17 +- .../calcite/druid/DruidIntervalUtils.java | 466 ++++++++ .../ql/optimizer/calcite/druid/DruidQuery.java | 1053 ++++++++++++++++++ .../optimizer/calcite/druid/DruidQueryType.java | 42 + .../ql/optimizer/calcite/druid/DruidRules.java | 591 ++++++++++ .../ql/optimizer/calcite/druid/DruidSchema.java | 51 + .../ql/optimizer/calcite/druid/DruidTable.java | 121 ++ .../optimizer/calcite/druid/HiveDruidConf.java | 33 + .../functions/HiveSqlCountAggFunction.java | 2 +- .../functions/HiveSqlMinMaxAggFunction.java | 2 +- .../functions/HiveSqlSumAggFunction.java | 2 +- .../reloperators/HiveDateGranularity.java | 54 + .../rules/HiveProjectSortTransposeRule.java | 5 + .../rules/HiveSortProjectTransposeRule.java | 5 + .../calcite/translator/ASTBuilder.java | 38 +- .../calcite/translator/ASTConverter.java | 9 +- .../translator/SqlFunctionConverter.java | 23 +- .../hadoop/hive/ql/parse/CalcitePlanner.java | 119 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 22 +- .../hadoop/hive/ql/plan/CreateTableDesc.java | 8 +- .../hadoop/hive/ql/plan/TableScanDesc.java | 7 + .../apache/hadoop/hive/ql/udf/UDFDateFloor.java | 506 +++++++++ .../hadoop/hive/ql/udf/UDFDateFloorDay.java | 39 + .../hadoop/hive/ql/udf/UDFDateFloorHour.java | 39 + .../hadoop/hive/ql/udf/UDFDateFloorMinute.java | 39 + .../hadoop/hive/ql/udf/UDFDateFloorMonth.java | 39 + .../hadoop/hive/ql/udf/UDFDateFloorQuarter.java | 39 + .../hadoop/hive/ql/udf/UDFDateFloorSecond.java | 39 + .../hadoop/hive/ql/udf/UDFDateFloorWeek.java | 39 + .../hadoop/hive/ql/udf/UDFDateFloorYear.java | 39 + .../calcite/TestCBORuleFiredOnlyOnce.java | 2 +- .../ql/udf/TestUDFDateFormatGranularity.java | 85 ++ .../test/queries/clientnegative/druid_address.q | 5 + .../test/queries/clientnegative/druid_buckets.q | 6 + .../queries/clientnegative/druid_datasource.q | 3 + .../queries/clientnegative/druid_external.q | 5 + .../queries/clientnegative/druid_location.q | 6 + .../queries/clientnegative/druid_partitions.q | 6 + .../test/queries/clientpositive/druid_basic1.q | 18 + .../test/queries/clientpositive/druid_basic2.q | 52 + .../queries/clientpositive/druid_intervals.q | 67 ++ .../queries/clientpositive/druid_timeseries.q | 94 ++ ql/src/test/queries/clientpositive/druid_topn.q | 75 ++ .../results/clientnegative/druid_address.q.out | 7 + .../results/clientnegative/druid_buckets.q.out | 8 + .../clientnegative/druid_datasource.q.out | 7 + .../results/clientnegative/druid_external.q.out | 7 + .../results/clientnegative/druid_location.q.out | 9 + .../clientnegative/druid_partitions.q.out | 8 + .../results/clientpositive/create_view.q.out | 2 + .../results/clientpositive/druid_basic1.q.out | 142 +++ .../results/clientpositive/druid_basic2.q.out | 533 +++++++++ .../clientpositive/druid_intervals.q.out | 398 +++++++ .../clientpositive/druid_timeseries.q.out | 566 ++++++++++ .../results/clientpositive/druid_topn.q.out | 419 +++++++ .../results/clientpositive/explain_ddl.q.out | 2 + .../clientpositive/explain_logical.q.out | 16 + .../test/results/clientpositive/join_view.q.out | 4 + .../clientpositive/llap/explainuser_1.q.out | 2 +- .../test/results/clientpositive/masking_2.q.out | 14 + .../test/results/clientpositive/masking_6.q.out | 8 + .../test/results/clientpositive/masking_7.q.out | 8 + .../clientpositive/serde_user_properties.q.out | 4 + .../results/clientpositive/show_functions.q.out | 9 + .../clientpositive/spark/join_view.q.out | 4 + .../results/clientpositive/subquery_notin.q.out | 6 + .../results/clientpositive/subquery_views.q.out | 4 + 92 files changed, 8969 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/common/src/java/org/apache/hadoop/hive/conf/Constants.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 00ec8c0..77c6aa5 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -15,8 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - package org.apache.hadoop.hive.conf; public class Constants { @@ -24,4 +22,12 @@ public class Constants { public static final String LLAP_LOGGER_NAME_QUERY_ROUTING = "query-routing"; public static final String LLAP_LOGGER_NAME_CONSOLE = "console"; public static final String LLAP_LOGGER_NAME_RFA = "RFA"; + + /* Constants for Druid storage handler */ + public static final String DRUID_HIVE_STORAGE_HANDLER_ID = + "org.apache.hadoop.hive.druid.DruidStorageHandler"; + public static final String DRUID_DATA_SOURCE = "druid.datasource"; + public static final String DRUID_QUERY_JSON = "druid.query.json"; + public static final String DRUID_QUERY_TYPE = "druid.query.type"; + public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; } http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 13cfdf1..d6944ee 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1884,6 +1884,17 @@ public class HiveConf extends Configuration { WRITE_SET_REAPER_INTERVAL("hive.writeset.reaper.interval", "60s", new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"), + // For Druid storage handler + HIVE_DRUID_BROKER_DEFAULT_ADDRESS("hive.druid.broker.address.default", "localhost:8082", + "Address of the Druid broker. If we are querying Druid from Hive, this address needs to be\n" + + "declared"), + HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000, + "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" + + "per query. In order to do that, we obtain the estimated size for the complete result. If the\n" + + "number of records of the query results is larger than this threshold, we split the query in\n" + + "total number of rows/threshold parts across the time dimension. Note that we assume the\n" + + "records to be split uniformly across the time dimension"), + // For HBase storage handler HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true, "Whether writes to HBase should be forced to the write-ahead log. \n" + http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/pom.xml ---------------------------------------------------------------------- diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml new file mode 100644 index 0000000..2173cdc --- /dev/null +++ b/druid-handler/pom.xml @@ -0,0 +1,201 @@ + + + + 4.0.0 + + org.apache.hive + hive + 2.2.0-SNAPSHOT + ../pom.xml + + + hive-druid-handler + jar + Hive Druid Handler + + + .. + + + + + + + org.apache.hive + hive-exec + ${project.version} + + + io.netty + netty-all + + + io.netty + netty + + + + + + commons-lang + commons-lang + ${commons-lang.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + true + + + org.slf4j + slf4j-log4j12 + + + commmons-logging + commons-logging + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + true + + + io.netty + netty + + + + + io.druid + druid-processing + ${druid.version} + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + + + + + + + junit + junit + ${junit.version} + test + + + + + ${basedir}/src/java + ${basedir}/src/test + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven.shade.plugin.version} + + + + + package + + shade + + + true + false + + + io.druid + org.apache.hive.druid.io.druid + + + com.metamx.emitter + org.apache.hive.druid.com.metamx.emitter + + + com.metamx.http.client + org.apache.hive.druid.com.metamx.http.client + + + io.netty + org.apache.hive.druid.io.netty + + + org.jboss.netty + org.apache.hive.druid.org.jboss.netty + + + com.fasterxml.jackson + org.apache.hive.druid.com.fasterxml.jackson + + + + + io.druid:* + com.metamx:emitter:* + com.metamx:http-client:* + io.netty:* + com.fasterxml.jackson.core:* + com.fasterxml.jackson.datatype:* + com.fasterxml.jackson.dataformat:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java new file mode 100644 index 0000000..ac03099 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.druid.serde.DruidSerDe; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. + */ +@SuppressWarnings({"deprecation","rawtypes"}) +public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHook { + + protected static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class); + + @Override + public Class getInputFormatClass() { + return HiveDruidQueryBasedInputFormat.class; + } + + @Override + public Class getOutputFormatClass() { + return HiveDruidOutputFormat.class; + } + + @Override + public Class getSerDeClass() { + return DruidSerDe.class; + } + + @Override + public HiveMetaHook getMetaHook() { + return this; + } + + @Override + public void preCreateTable(Table table) throws MetaException { + // Do safety checks + if (!MetaStoreUtils.isExternalTable(table)) { + throw new MetaException("Table in Druid needs to be declared as EXTERNAL"); + } + if (!StringUtils.isEmpty(table.getSd().getLocation())) { + throw new MetaException("LOCATION may not be specified for Druid"); + } + if (table.getPartitionKeysSize() != 0) { + throw new MetaException("PARTITIONED BY may not be specified for Druid"); + } + if (table.getSd().getBucketColsSize() != 0) { + throw new MetaException("CLUSTERED BY may not be specified for Druid"); + } + } + + @Override + public void rollbackCreateTable(Table table) throws MetaException { + // Nothing to do + } + + @Override + public void commitCreateTable(Table table) throws MetaException { + // Nothing to do + } + + @Override + public void preDropTable(Table table) throws MetaException { + // Nothing to do + } + + @Override + public void rollbackDropTable(Table table) throws MetaException { + // Nothing to do + } + + @Override + public void commitDropTable(Table table, boolean deleteData) throws MetaException { + // Nothing to do + } + + @Override + public String toString() { + return Constants.DRUID_HIVE_STORAGE_HANDLER_ID; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java new file mode 100644 index 0000000..c6b8024 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.concurrent.ExecutionException; + +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpMethod; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.InputStreamResponseHandler; + +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.BaseQuery; + +/** + * Utils class for Druid storage handler. + */ +public final class DruidStorageHandlerUtils { + + private static final String SMILE_CONTENT_TYPE = "application/x-jackson-smile"; + + /** + * Mapper to use to serialize/deserialize Druid objects (JSON) + */ + public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); + + /** + * Mapper to use to serialize/deserialize Druid objects (SMILE) + */ + public static final ObjectMapper SMILE_MAPPER = new DefaultObjectMapper(new SmileFactory()); + + /** + * Method that creates a request for Druid JSON query (using SMILE). + * @param mapper + * @param address + * @param query + * @return + * @throws IOException + */ + public static Request createRequest(String address, BaseQuery query) + throws IOException { + return new Request(HttpMethod.POST, new URL(String.format("%s/druid/v2/", "http://" + address))) + .setContent(SMILE_MAPPER.writeValueAsBytes(query)) + .setHeader(HttpHeaders.Names.CONTENT_TYPE, SMILE_CONTENT_TYPE); + } + + /** + * Method that submits a request to an Http address and retrieves the result. + * The caller is responsible for closing the stream once it finishes consuming it. + * @param client + * @param request + * @return + * @throws IOException + */ + public static InputStream submitRequest(HttpClient client, Request request) + throws IOException { + InputStream response; + try { + response = client.go(request, new InputStreamResponseHandler()).get(); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } catch (InterruptedException e) { + throw new IOException(e.getCause()); + } + return response; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java new file mode 100644 index 0000000..45e31d6 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidOutputFormat.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.util.Progressable; + +/** + * Place holder for Druid output format. Currently not implemented. + */ +@SuppressWarnings("rawtypes") +public class HiveDruidOutputFormat implements HiveOutputFormat { + + @Override + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, + Progressable progress) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) + throws IOException { + throw new UnsupportedOperationException(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java new file mode 100644 index 0000000..3df1452 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidQueryBasedInputFormat.java @@ -0,0 +1,369 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader; +import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader; +import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader; +import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader; +import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader; +import org.apache.hadoop.hive.druid.serde.DruidWritable; +import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidIntervalUtils; +import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.joda.time.Interval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; + +import io.druid.query.Druids; +import io.druid.query.Druids.SegmentMetadataQueryBuilder; +import io.druid.query.Druids.SelectQueryBuilder; +import io.druid.query.Druids.TimeBoundaryQueryBuilder; +import io.druid.query.Query; +import io.druid.query.Result; +import io.druid.query.metadata.metadata.SegmentAnalysis; +import io.druid.query.metadata.metadata.SegmentMetadataQuery; +import io.druid.query.select.PagingSpec; +import io.druid.query.select.SelectQuery; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.timeboundary.TimeBoundaryQuery; +import io.druid.query.timeboundary.TimeBoundaryResultValue; + +/** + * Druid query based input format. + * + * Given a query and the Druid broker address, it will send it, and retrieve + * and parse the results. + */ +public class HiveDruidQueryBasedInputFormat extends InputFormat + implements org.apache.hadoop.mapred.InputFormat { + + protected static final Logger LOG = LoggerFactory.getLogger(HiveDruidQueryBasedInputFormat.class); + + @Override + public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits) + throws IOException { + return getInputSplits(job); + } + + @Override + public List getSplits(JobContext context) throws IOException, InterruptedException { + return Arrays. asList(getInputSplits(context.getConfiguration())); + } + + @SuppressWarnings("deprecation") + private HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException { + String address = HiveConf.getVar(conf, + HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); + if (StringUtils.isEmpty(address)) { + throw new IOException("Druid broker address not specified in configuration"); + } + String druidQuery = StringEscapeUtils.unescapeJava(conf.get(Constants.DRUID_QUERY_JSON)); + String druidQueryType; + if (StringUtils.isEmpty(druidQuery)) { + // Empty, maybe because CBO did not run; we fall back to + // full Select query + if (LOG.isWarnEnabled()) { + LOG.warn("Druid query is empty; creating Select query"); + } + String dataSource = conf.get(Constants.DRUID_DATA_SOURCE); + if (dataSource == null) { + throw new IOException("Druid data source cannot be empty"); + } + druidQuery = createSelectStarQuery(address, dataSource); + druidQueryType = Query.SELECT; + } else { + druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE); + if (druidQueryType == null) { + throw new IOException("Druid query type not recognized"); + } + } + + // hive depends on FileSplits + Job job = new Job(conf); + JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); + Path [] paths = FileInputFormat.getInputPaths(jobContext); + + switch (druidQueryType) { + case Query.TIMESERIES: + case Query.TOPN: + case Query.GROUP_BY: + return new HiveDruidSplit[] { new HiveDruidSplit(address, druidQuery, paths[0]) }; + case Query.SELECT: + return splitSelectQuery(conf, address, druidQuery, paths[0]); + default: + throw new IOException("Druid query type not recognized"); + } + } + + private static String createSelectStarQuery(String address, String dataSource) throws IOException { + // Create Select query + SelectQueryBuilder builder = new Druids.SelectQueryBuilder(); + builder.dataSource(dataSource); + builder.intervals(Arrays.asList(DruidTable.DEFAULT_INTERVAL)); + builder.pagingSpec(PagingSpec.newSpec(1)); + Map context = new HashMap<>(); + context.put(Constants.DRUID_QUERY_FETCH, false); + builder.context(context); + return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build()); + } + + /* Method that splits Select query depending on the threshold so read can be + * parallelized */ + private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address, + String druidQuery, Path dummyPath) throws IOException { + final int selectThreshold = (int) HiveConf.getIntVar( + conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD); + + SelectQuery query; + try { + query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class); + } catch (Exception e) { + throw new IOException(e); + } + + final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false); + if (isFetch) { + // If it has a limit, we use it and we do not split the query + return new HiveDruidSplit[] { new HiveDruidSplit( + address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) }; + } + + // We do not have the number of rows, thus we need to execute a + // Segment Metadata query to obtain number of rows + SegmentMetadataQueryBuilder metadataBuilder = new Druids.SegmentMetadataQueryBuilder(); + metadataBuilder.dataSource(query.getDataSource()); + metadataBuilder.intervals(query.getIntervals()); + metadataBuilder.merge(true); + metadataBuilder.analysisTypes(); + SegmentMetadataQuery metadataQuery = metadataBuilder.build(); + + HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle()); + InputStream response; + try { + response = DruidStorageHandlerUtils.submitRequest(client, + DruidStorageHandlerUtils.createRequest(address, metadataQuery)); + } catch (Exception e) { + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + + // Retrieve results + List metadataList; + try { + metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response, + new TypeReference>() {}); + } catch (Exception e) { + response.close(); + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + if (metadataList == null || metadataList.isEmpty()) { + throw new IOException("Connected to Druid but could not retrieve datasource information"); + } + if (metadataList.size() != 1) { + throw new IOException("Information about segments should have been merged"); + } + + final long numRows = metadataList.get(0).getNumRows(); + + query = query.withPagingSpec(PagingSpec.newSpec(selectThreshold)); + if (numRows <= selectThreshold) { + // We are not going to split it + return new HiveDruidSplit[] { new HiveDruidSplit(address, + DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) }; + } + + // If the query does not specify a timestamp, we obtain the total time using + // a Time Boundary query. Then, we use the information to split the query + // following the Select threshold configuration property + final List intervals = new ArrayList<>(); + if (query.getIntervals().size() == 1 && + query.getIntervals().get(0).equals(DruidTable.DEFAULT_INTERVAL)) { + // Default max and min, we should execute a time boundary query to get a + // more precise range + TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder(); + timeBuilder.dataSource(query.getDataSource()); + TimeBoundaryQuery timeQuery = timeBuilder.build(); + + try { + response = DruidStorageHandlerUtils.submitRequest(client, + DruidStorageHandlerUtils.createRequest(address, timeQuery)); + } catch (Exception e) { + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + + // Retrieve results + List> timeList; + try { + timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response, + new TypeReference>>() {}); + } catch (Exception e) { + response.close(); + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + if (timeList == null || timeList.isEmpty()) { + throw new IOException("Connected to Druid but could not retrieve time boundary information"); + } + if (timeList.size() != 1) { + throw new IOException("We should obtain a single time boundary"); + } + + intervals.add(new Interval(timeList.get(0).getValue().getMinTime().getMillis(), + timeList.get(0).getValue().getMaxTime().getMillis())); + } else { + intervals.addAll(query.getIntervals()); + } + + // Create (numRows/default threshold) input splits + int numSplits = (int) Math.ceil((double) numRows / selectThreshold); + List> newIntervals = createSplitsIntervals(intervals, numSplits); + HiveDruidSplit[] splits = new HiveDruidSplit[numSplits]; + for (int i = 0; i < numSplits; i++) { + // Create partial Select query + final SelectQuery partialQuery = query.withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(newIntervals.get(i))); + splits[i] = new HiveDruidSplit(address, + DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath); + } + return splits; + } + + private static List> createSplitsIntervals(List intervals, int numSplits) { + final long totalTime = DruidIntervalUtils.extractTotalTime(intervals); + long startTime = intervals.get(0).getStartMillis(); + long endTime = startTime; + long currTime = 0; + List> newIntervals = new ArrayList<>(); + for (int i = 0, posIntervals = 0; i < numSplits; i++) { + final long rangeSize = Math.round( (double) (totalTime * (i + 1)) / numSplits) - + Math.round( (double) (totalTime * i) / numSplits); + // Create the new interval(s) + List currentIntervals = new ArrayList<>(); + while (posIntervals < intervals.size()) { + final Interval interval = intervals.get(posIntervals); + final long expectedRange = rangeSize - currTime; + if (interval.getEndMillis() - startTime >= expectedRange) { + endTime = startTime + expectedRange; + currentIntervals.add(new Interval(startTime, endTime)); + startTime = endTime; + currTime = 0; + break; + } + endTime = interval.getEndMillis(); + currentIntervals.add(new Interval(startTime, endTime)); + currTime += (endTime - startTime); + startTime = intervals.get(++posIntervals).getStartMillis(); + } + newIntervals.add(currentIntervals); + } + assert endTime == intervals.get(intervals.size()-1).getEndMillis(); + return newIntervals; + } + + @Override + public org.apache.hadoop.mapred.RecordReader getRecordReader( + org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter) + throws IOException { + // We need to provide a different record reader for every type of Druid query. + // The reason is that Druid results format is different for each type. + final DruidQueryRecordReader reader; + final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE); + if (druidQueryType == null) { + reader = new DruidSelectQueryRecordReader(); // By default + reader.initialize((HiveDruidSplit)split, job); + return reader; + } + switch (druidQueryType) { + case Query.TIMESERIES: + reader = new DruidTimeseriesQueryRecordReader(); + break; + case Query.TOPN: + reader = new DruidTopNQueryRecordReader(); + break; + case Query.GROUP_BY: + reader = new DruidGroupByQueryRecordReader(); + break; + case Query.SELECT: + reader = new DruidSelectQueryRecordReader(); + break; + default: + throw new IOException("Druid query type not recognized"); + } + reader.initialize((HiveDruidSplit)split, job); + return reader; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + // We need to provide a different record reader for every type of Druid query. + // The reason is that Druid results format is different for each type. + final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE); + if (druidQueryType == null) { + return new DruidSelectQueryRecordReader(); // By default + } + final DruidQueryRecordReader reader; + switch (druidQueryType) { + case Query.TIMESERIES: + reader = new DruidTimeseriesQueryRecordReader(); + break; + case Query.TOPN: + reader = new DruidTopNQueryRecordReader(); + break; + case Query.GROUP_BY: + reader = new DruidGroupByQueryRecordReader(); + break; + case Query.SELECT: + reader = new DruidSelectQueryRecordReader(); + break; + default: + throw new IOException("Druid query type not recognized"); + } + return reader; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java new file mode 100644 index 0000000..3fba5d0 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/HiveDruidSplit.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; + +/** + * Druid split. Its purpose is to trigger query execution in Druid. + */ +public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit { + + private String address; + private String druidQuery; + + // required for deserialization + public HiveDruidSplit() { + super((Path) null, 0, 0, (String[]) null); + } + + public HiveDruidSplit(String address, String druidQuery, Path dummyPath) { + super(dummyPath, 0, 0, (String[]) null); + this.address = address; + this.druidQuery = druidQuery; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeUTF(address); + out.writeUTF(druidQuery); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + address = in.readUTF(); + druidQuery = in.readUTF(); + } + + @Override + public long getLength() { + return 0L; + } + + @Override + public String[] getLocations() { + return new String[] {""} ; + } + + public String getAddress() { + return address; + } + + public String getDruidQuery() { + return druidQuery; + } + + @Override + public String toString() { + return "HiveDruidSplit{" + address + ", " + druidQuery + "}"; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java new file mode 100644 index 0000000..226060f --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.serde; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; + +import com.fasterxml.jackson.core.type.TypeReference; + +import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.groupby.GroupByQuery; + +/** + * Record reader for results for Druid GroupByQuery. + */ +public class DruidGroupByQueryRecordReader + extends DruidQueryRecordReader { + + private Row current; + private int[] indexes = new int[0]; + // Row objects returned by GroupByQuery have different access paths depending on + // whether the result for the metric is a Float or a Long, thus we keep track + // using these converters + private Extract[] extractors; + + @Override + public void initialize(InputSplit split, Configuration conf) throws IOException { + super.initialize(split, conf); + initExtractors(); + } + + @Override + protected GroupByQuery createQuery(String content) throws IOException { + return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, GroupByQuery.class); + } + + @Override + protected List createResultsList(InputStream content) throws IOException { + return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content, + new TypeReference>(){}); + } + + private void initExtractors() throws IOException { + extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs().size()]; + int counter = 0; + for (int i = 0; i < query.getAggregatorSpecs().size(); i++, counter++) { + AggregatorFactory af = query.getAggregatorSpecs().get(i); + switch (af.getTypeName().toUpperCase()) { + case DruidSerDeUtils.FLOAT_TYPE: + extractors[counter] = Extract.FLOAT; + break; + case DruidSerDeUtils.LONG_TYPE: + extractors[counter] = Extract.LONG; + break; + default: + throw new IOException("Type not supported"); + } + } + for (int i = 0; i < query.getPostAggregatorSpecs().size(); i++, counter++) { + extractors[counter] = Extract.FLOAT; + } + } + + @Override + public boolean nextKeyValue() { + // Refresh indexes + for (int i = indexes.length - 1; i >= 0; i--) { + if (indexes[i] > 0) { + indexes[i]--; + for (int j = i + 1; j < indexes.length; j++) { + indexes[j] = current.getDimension( + query.getDimensions().get(j).getDimension()).size() - 1; + } + return true; + } + } + // Results + if (results.hasNext()) { + current = results.next(); + indexes = new int[query.getDimensions().size()]; + for (int i=0; i < query.getDimensions().size(); i++) { + DimensionSpec ds = query.getDimensions().get(i); + indexes[i] = current.getDimension(ds.getDimension()).size() - 1; + } + return true; + } + return false; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public DruidWritable getCurrentValue() throws IOException, InterruptedException { + // Create new value + DruidWritable value = new DruidWritable(); + // 1) The timestamp column + value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + // 2) The dimension columns + for (int i=0; i < query.getDimensions().size(); i++) { + DimensionSpec ds = query.getDimensions().get(i); + List dims = current.getDimension(ds.getDimension()); + int pos = dims.size() - indexes[i] - 1; + value.getValue().put(ds.getOutputName(), dims.get(pos)); + } + int counter = 0; + // 3) The aggregation columns + for (AggregatorFactory af : query.getAggregatorSpecs()) { + switch (extractors[counter++]) { + case FLOAT: + value.getValue().put(af.getName(), current.getFloatMetric(af.getName())); + break; + case LONG: + value.getValue().put(af.getName(), current.getLongMetric(af.getName())); + break; + } + } + // 4) The post-aggregation columns + for (PostAggregator pa : query.getPostAggregatorSpecs()) { + assert extractors[counter++] == Extract.FLOAT; + value.getValue().put(pa.getName(), current.getFloatMetric(pa.getName())); + } + return value; + } + + @Override + public boolean next(NullWritable key, DruidWritable value) { + if (nextKeyValue()) { + // Update value + value.getValue().clear(); + // 1) The timestamp column + value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + // 2) The dimension columns + for (int i=0; i < query.getDimensions().size(); i++) { + DimensionSpec ds = query.getDimensions().get(i); + List dims = current.getDimension(ds.getDimension()); + int pos = dims.size() - indexes[i] - 1; + value.getValue().put(ds.getOutputName(), dims.get(pos)); + } + int counter = 0; + // 3) The aggregation columns + for (AggregatorFactory af : query.getAggregatorSpecs()) { + switch (extractors[counter++]) { + case FLOAT: + value.getValue().put(af.getName(), current.getFloatMetric(af.getName())); + break; + case LONG: + value.getValue().put(af.getName(), current.getLongMetric(af.getName())); + break; + } + } + // 4) The post-aggregation columns + for (PostAggregator pa : query.getPostAggregatorSpecs()) { + assert extractors[counter++] == Extract.FLOAT; + value.getValue().put(pa.getName(), current.getFloatMetric(pa.getName())); + } + return true; + } + return false; + } + + @Override + public float getProgress() throws IOException { + return results.hasNext() ? 0 : 1; + } + + private enum Extract { + FLOAT, + LONG + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java new file mode 100644 index 0000000..96bcee8 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.serde; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.HiveDruidSplit; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterators; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; + +import io.druid.query.BaseQuery; + +/** + * Base record reader for given a Druid query. This class contains the logic to + * send the query to the broker and retrieve the results. The transformation to + * emit records needs to be done by the classes that extend the reader. + * + * The key for each record will be a NullWritable, while the value will be a + * DruidWritable containing the timestamp as well as all values resulting from + * the query. + */ +public abstract class DruidQueryRecordReader,R extends Comparable> + extends RecordReader + implements org.apache.hadoop.mapred.RecordReader { + + private static final Logger LOG = LoggerFactory.getLogger(DruidQueryRecordReader.class); + + /** + * Query that Druid executes. + */ + protected T query; + + /** + * Query results. + */ + protected Iterator results = Iterators.emptyIterator(); + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException { + initialize(split, context.getConfiguration()); + } + + public void initialize(InputSplit split, Configuration conf) throws IOException { + HiveDruidSplit hiveDruidSplit = (HiveDruidSplit) split; + + // Create query + query = createQuery(hiveDruidSplit.getDruidQuery()); + + // Execute query + if (LOG.isInfoEnabled()) { + LOG.info("Retrieving from druid using query:\n " + query); + } + + HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle()); + InputStream response = DruidStorageHandlerUtils.submitRequest(client, + DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query)); + + // Retrieve results + List resultsList; + try { + resultsList = createResultsList(response); + } catch (IOException e) { + response.close(); + throw e; + } + if (resultsList == null || resultsList.isEmpty()) { + return; + } + results = resultsList.iterator(); + } + + protected abstract T createQuery(String content) throws IOException; + + protected abstract List createResultsList(InputStream content) throws IOException; + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public DruidWritable createValue() { + return new DruidWritable(); + } + + @Override + public abstract boolean next(NullWritable key, DruidWritable value) throws IOException; + + @Override + public long getPos() { + return 0; + } + + @Override + public abstract boolean nextKeyValue() throws IOException; + + @Override + public abstract NullWritable getCurrentKey() throws IOException, InterruptedException; + + @Override + // TODO: we could generate vector row batches so that vectorized execution may get triggered + public abstract DruidWritable getCurrentValue() throws IOException, InterruptedException; + + @Override + public abstract float getProgress() throws IOException; + + @Override + public void close() { + // Nothing to do + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java new file mode 100644 index 0000000..70b493c --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.serde; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable; +import org.apache.hadoop.io.NullWritable; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Iterators; + +import io.druid.query.Result; +import io.druid.query.select.EventHolder; +import io.druid.query.select.SelectQuery; +import io.druid.query.select.SelectResultValue; + +/** + * Record reader for results for Druid SelectQuery. + */ +public class DruidSelectQueryRecordReader + extends DruidQueryRecordReader> { + + private Result current; + private Iterator values = Iterators.emptyIterator(); + + @Override + protected SelectQuery createQuery(String content) throws IOException { + return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, SelectQuery.class); + } + + @Override + protected List> createResultsList(InputStream content) throws IOException { + return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content, + new TypeReference>>(){}); + } + + @Override + public boolean nextKeyValue() throws IOException { + if (values.hasNext()) { + return true; + } + if (results.hasNext()) { + current = results.next(); + values = current.getValue().getEvents().iterator(); + return true; + } + return false; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public DruidWritable getCurrentValue() throws IOException, InterruptedException { + // Create new value + DruidWritable value = new DruidWritable(); + value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + if (values.hasNext()) { + value.getValue().putAll(values.next().getEvent()); + return value; + } + return value; + } + + @Override + public boolean next(NullWritable key, DruidWritable value) throws IOException { + if (nextKeyValue()) { + // Update value + value.getValue().clear(); + value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + if (values.hasNext()) { + value.getValue().putAll(values.next().getEvent()); + } + return true; + } + return false; + } + + @Override + public float getProgress() { + return results.hasNext() || values.hasNext() ? 0 : 1; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java new file mode 100644 index 0000000..8f53d4a --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -0,0 +1,343 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.serde; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeSpec; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Lists; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; + +import io.druid.query.Druids; +import io.druid.query.Druids.SegmentMetadataQueryBuilder; +import io.druid.query.Query; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.metadata.metadata.ColumnAnalysis; +import io.druid.query.metadata.metadata.SegmentAnalysis; +import io.druid.query.metadata.metadata.SegmentMetadataQuery; +import io.druid.query.select.SelectQuery; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.topn.TopNQuery; + +/** + * DruidSerDe that is used to deserialize objects from a Druid data source. + */ +@SerDeSpec(schemaProps = {Constants.DRUID_DATA_SOURCE}) +public class DruidSerDe extends AbstractSerDe { + + protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class); + + private String[] columns; + private PrimitiveTypeInfo[] types; + private ObjectInspector inspector; + + + @Override + public void initialize(Configuration configuration, Properties properties) throws SerDeException { + final List columnNames = new ArrayList<>(); + final List columnTypes = new ArrayList<>(); + List inspectors = new ArrayList<>(); + + // Druid query + String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON); + if (druidQuery == null) { + // No query. We need to create a Druid Segment Metadata query that retrieves all + // columns present in the data source (dimensions and metrics). + // Create Segment Metadata Query + String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE); + if (dataSource == null) { + throw new SerDeException("Druid data source not specified; use " + + Constants.DRUID_DATA_SOURCE + " in table properties"); + } + SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder(); + builder.dataSource(dataSource); + builder.merge(true); + builder.analysisTypes(); + SegmentMetadataQuery query = builder.build(); + + // Execute query in Druid + String address = HiveConf.getVar(configuration, + HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); + if (org.apache.commons.lang3.StringUtils.isEmpty(address)) { + throw new SerDeException("Druid broker address not specified in configuration"); + } + + // Infer schema + SegmentAnalysis schemaInfo; + try { + schemaInfo = submitMetadataRequest(address, query); + } catch (IOException e) { + throw new SerDeException(e); + } + for (Entry columnInfo : schemaInfo.getColumns().entrySet()) { + if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) { + // Special handling for timestamp column + columnNames.add(columnInfo.getKey()); // field name + PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type + columnTypes.add(type); + inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type)); + continue; + } + columnNames.add(columnInfo.getKey()); // field name + PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType( + columnInfo.getValue().getType()); // field type + columnTypes.add(type); + inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type)); + } + columns = columnNames.toArray(new String[columnNames.size()]); + types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]); + inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); + } else { + // Query is specified, we can extract the results schema from the query + Query query; + try { + query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, Query.class); + } catch (Exception e) { + throw new SerDeException(e); + } + + switch (query.getType()) { + case Query.TIMESERIES: + inferSchema((TimeseriesQuery) query, columnNames, columnTypes); + break; + case Query.TOPN: + inferSchema((TopNQuery) query, columnNames, columnTypes); + break; + case Query.SELECT: + inferSchema((SelectQuery) query, columnNames, columnTypes); + break; + case Query.GROUP_BY: + inferSchema((GroupByQuery) query, columnNames, columnTypes); + break; + default: + throw new SerDeException("Not supported Druid query"); + } + + columns = new String[columnNames.size()]; + types = new PrimitiveTypeInfo[columnNames.size()]; + for (int i = 0; i < columnTypes.size(); ++i) { + columns[i] = columnNames.get(i); + types[i] = columnTypes.get(i); + inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i])); + } + inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("DruidSerDe initialized with\n" + + "\t columns: " + columnNames + + "\n\t types: " + columnTypes); + } + } + + /* Submits the request and returns */ + protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query) + throws SerDeException, IOException { + HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), new Lifecycle()); + InputStream response; + try { + response = DruidStorageHandlerUtils.submitRequest(client, + DruidStorageHandlerUtils.createRequest(address, query)); + } catch (Exception e) { + throw new SerDeException(StringUtils.stringifyException(e)); + } + + // Retrieve results + List resultsList; + try { + resultsList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response, + new TypeReference>() {}); + } catch (Exception e) { + response.close(); + throw new SerDeException(StringUtils.stringifyException(e)); + } + if (resultsList == null || resultsList.isEmpty()) { + throw new SerDeException("Connected to Druid but could not retrieve datasource information"); + } + if (resultsList.size() != 1) { + throw new SerDeException("Information about segments should have been merged"); + } + + return resultsList.get(0); + } + + /* Timeseries query */ + private void inferSchema(TimeseriesQuery query, List columnNames, + List columnTypes) { + // Timestamp column + columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN); + columnTypes.add(TypeInfoFactory.timestampTypeInfo); + // Aggregator columns + for (AggregatorFactory af : query.getAggregatorSpecs()) { + columnNames.add(af.getName()); + columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName())); + } + // Post-aggregator columns + for (PostAggregator pa : query.getPostAggregatorSpecs()) { + columnNames.add(pa.getName()); + columnTypes.add(TypeInfoFactory.floatTypeInfo); + } + } + + /* TopN query */ + private void inferSchema(TopNQuery query, List columnNames, List columnTypes) { + // Timestamp column + columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN); + columnTypes.add(TypeInfoFactory.timestampTypeInfo); + // Dimension column + columnNames.add(query.getDimensionSpec().getOutputName()); + columnTypes.add(TypeInfoFactory.stringTypeInfo); + // Aggregator columns + for (AggregatorFactory af : query.getAggregatorSpecs()) { + columnNames.add(af.getName()); + columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName())); + } + // Post-aggregator columns + for (PostAggregator pa : query.getPostAggregatorSpecs()) { + columnNames.add(pa.getName()); + columnTypes.add(TypeInfoFactory.floatTypeInfo); + } + } + + /* Select query */ + private void inferSchema(SelectQuery query, List columnNames, + List columnTypes) { + // Timestamp column + columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN); + columnTypes.add(TypeInfoFactory.timestampTypeInfo); + // Dimension columns + for (DimensionSpec ds : query.getDimensions()) { + columnNames.add(ds.getOutputName()); + columnTypes.add(TypeInfoFactory.stringTypeInfo); + } + // Metric columns + for (String metric : query.getMetrics()) { + columnNames.add(metric); + columnTypes.add(TypeInfoFactory.floatTypeInfo); + } + } + + /* GroupBy query */ + private void inferSchema(GroupByQuery query, List columnNames, List columnTypes) { + // Timestamp column + columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN); + columnTypes.add(TypeInfoFactory.timestampTypeInfo); + // Dimension columns + for (DimensionSpec ds : query.getDimensions()) { + columnNames.add(ds.getOutputName()); + columnTypes.add(TypeInfoFactory.stringTypeInfo); + } + // Aggregator columns + for (AggregatorFactory af : query.getAggregatorSpecs()) { + columnNames.add(af.getName()); + columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(af.getTypeName())); + } + // Post-aggregator columns + for (PostAggregator pa : query.getPostAggregatorSpecs()) { + columnNames.add(pa.getName()); + columnTypes.add(TypeInfoFactory.floatTypeInfo); + } + } + + @Override + public Class getSerializedClass() { + return NullWritable.class; + } + + @Override + public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException { + return NullWritable.get(); + } + + @Override + public SerDeStats getSerDeStats() { + throw new UnsupportedOperationException("SerdeStats not supported."); + } + + @Override + public Object deserialize(Writable writable) throws SerDeException { + DruidWritable input = (DruidWritable) writable; + List output = Lists.newArrayListWithExpectedSize(columns.length); + for (int i = 0; i < columns.length; i++) { + final Object value = input.getValue().get(columns[i]); + if (value == null) { + output.add(null); + continue; + } + switch (types[i].getPrimitiveCategory()) { + case TIMESTAMP: + output.add(new TimestampWritable(new Timestamp((Long)value))); + break; + case LONG: + output.add(new LongWritable(((Number)value).longValue())); + break; + case FLOAT: + output.add(new FloatWritable(((Number)value).floatValue())); + break; + case STRING: + output.add(new Text(value.toString())); + break; + default: + throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory()); + } + } + return output; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return inspector; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java new file mode 100644 index 0000000..29b8845 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.serde; + +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utils class for Druid SerDe. + */ +public final class DruidSerDeUtils { + + private static final Logger LOG = LoggerFactory.getLogger(DruidSerDeUtils.class); + + protected static final String FLOAT_TYPE = "FLOAT"; + protected static final String LONG_TYPE = "LONG"; + protected static final String STRING_TYPE = "STRING"; + + /* This method converts from the String representation of Druid type + * to the corresponding Hive type */ + public static PrimitiveTypeInfo convertDruidToHiveType(String typeName) { + typeName = typeName.toUpperCase(); + switch(typeName) { + case FLOAT_TYPE: + return TypeInfoFactory.floatTypeInfo; + case LONG_TYPE: + return TypeInfoFactory.longTypeInfo; + case STRING_TYPE: + return TypeInfoFactory.stringTypeInfo; + default: + // This is a guard for special Druid types e.g. hyperUnique + // (http://druid.io/docs/0.9.1.1/querying/aggregations.html#hyperunique-aggregator). + // Currently, we do not support doing anything special with them in Hive. + // However, those columns are there, and they can be actually read as normal + // dimensions e.g. with a select query. Thus, we print the warning and just read them + // as String. + LOG.warn("Transformation to STRING for unknown type " + typeName); + return TypeInfoFactory.stringTypeInfo; + } + } + + /* This method converts from the String representation of Druid type + * to the String representation of the corresponding Hive type */ + public static String convertDruidToHiveTypeString(String typeName) { + typeName = typeName.toUpperCase(); + switch(typeName) { + case FLOAT_TYPE: + return serdeConstants.FLOAT_TYPE_NAME; + case LONG_TYPE: + return serdeConstants.BIGINT_TYPE_NAME; + case STRING_TYPE: + return serdeConstants.STRING_TYPE_NAME; + default: + // This is a guard for special Druid types e.g. hyperUnique + // (http://druid.io/docs/0.9.1.1/querying/aggregations.html#hyperunique-aggregator). + // Currently, we do not support doing anything special with them in Hive. + // However, those columns are there, and they can be actually read as normal + // dimensions e.g. with a select query. Thus, we print the warning and just read them + // as String. + LOG.warn("Transformation to STRING for unknown type " + typeName); + return serdeConstants.STRING_TYPE_NAME; + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java new file mode 100644 index 0000000..812ae03 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.serde; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidTable; +import org.apache.hadoop.io.NullWritable; + +import com.fasterxml.jackson.core.type.TypeReference; + +import io.druid.query.Result; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesResultValue; + +/** + * Record reader for results for Druid TimeseriesQuery. + */ +public class DruidTimeseriesQueryRecordReader + extends DruidQueryRecordReader> { + + private Result current; + + @Override + protected TimeseriesQuery createQuery(String content) throws IOException { + return DruidStorageHandlerUtils.JSON_MAPPER.readValue(content, TimeseriesQuery.class); + } + + @Override + protected List> createResultsList(InputStream content) throws IOException { + return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content, + new TypeReference>>(){}); + } + + @Override + public boolean nextKeyValue() { + if (results.hasNext()) { + current = results.next(); + return true; + } + return false; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public DruidWritable getCurrentValue() throws IOException, InterruptedException { + // Create new value + DruidWritable value = new DruidWritable(); + value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + value.getValue().putAll(current.getValue().getBaseObject()); + return value; + } + + @Override + public boolean next(NullWritable key, DruidWritable value) { + if (nextKeyValue()) { + // Update value + value.getValue().clear(); + value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis()); + value.getValue().putAll(current.getValue().getBaseObject()); + return true; + } + return false; + } + + @Override + public float getProgress() throws IOException { + return results.hasNext() ? 0 : 1; + } + +}