hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject [2/2] hive git commit: HIVE-18976 : Add ability to setup Druid Kafka Ingestion from Hive (Nishant Bangarwa via Ashutosh Chauhan, Slim Bouguerra)
Date Wed, 04 Apr 2018 19:31:50 GMT
HIVE-18976 : Add ability to setup Druid Kafka Ingestion from Hive (Nishant Bangarwa via Ashutosh Chauhan, Slim Bouguerra)

Signed-off-by: Ashutosh Chauhan <hashutosh@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d3fed078
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d3fed078
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d3fed078

Branch: refs/heads/master
Commit: d3fed078cc47982822a29db3c8b03a61dcb5e7e9
Parents: d283899
Author: Nishant Bangarwa <nishant.monu51@gmail.com>
Authored: Fri Mar 16 11:25:00 2018 -0700
Committer: Ashutosh Chauhan <hashutosh@apache.org>
Committed: Wed Apr 4 12:30:51 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/Constants.java  |   7 +
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 data/scripts/kafka_init_data.json               |  10 +
 .../hadoop/hive/druid/DruidStorageHandler.java  | 341 ++++++++++++-
 .../hive/druid/DruidStorageHandlerUtils.java    | 148 +++++-
 .../hadoop/hive/druid/io/DruidOutputFormat.java | 116 +----
 .../druid/json/KafkaSupervisorIOConfig.java     | 247 ++++++++++
 .../hive/druid/json/KafkaSupervisorSpec.java    | 143 ++++++
 .../druid/json/KafkaSupervisorTuningConfig.java | 200 ++++++++
 .../hive/druid/json/KafkaTuningConfig.java      | 268 ++++++++++
 itests/qtest-druid/pom.xml                      |  11 +
 .../org/apache/hive/druid/MiniDruidCluster.java |  59 ++-
 .../hive/kafka/SingleNodeKafkaCluster.java      | 122 +++++
 .../hive/cli/TestMiniDruidKafkaCliDriver.java   |  63 +++
 .../test/resources/testconfiguration.properties |   2 +-
 .../hadoop/hive/cli/control/CliConfigs.java     |  23 +
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |  63 ++-
 pom.xml                                         |   2 +-
 .../clientpositive/druidkafkamini_basic.q       |  74 +++
 .../druid/druidkafkamini_basic.q.out            | 485 +++++++++++++++++++
 20 files changed, 2229 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 5535d69..ff9eb59 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -48,6 +48,13 @@ public class Constants {
   public static final String DRUID_SEGMENT_VERSION = "druid.segment.version";
   public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory";
 
+  public static final String KAFKA_TOPIC = "kafka.topic";
+  public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+
+  public static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = "druid.kafka.ingestion.";
+  /* Kafka Ingestion state - valid values - START/STOP/RESET */
+  public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion";
+
   public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD";
   public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD";
   public static final String HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG = "hadoop.security.credential.provider.path";

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 02367eb..1fd824b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2091,6 +2091,9 @@ public class HiveConf extends Configuration {
     HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS("hive.druid.coordinator.address.default", "localhost:8081",
             "Address of the Druid coordinator. It is used to check the load status of newly created segments"
     ),
+    HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS("hive.druid.overlord.address.default", "localhost:8090",
+        "Address of the Druid overlord. It is used to submit indexing tasks to druid."
+    ),
     HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000,
         "Takes only effect when hive.druid.select.distribute is set to false. \n" +
         "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/data/scripts/kafka_init_data.json
----------------------------------------------------------------------
diff --git a/data/scripts/kafka_init_data.json b/data/scripts/kafka_init_data.json
new file mode 100644
index 0000000..9e2c58c
--- /dev/null
+++ b/data/scripts/kafka_init_data.json
@@ -0,0 +1,10 @@
+{"__time": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
+{"__time": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
+{"__time": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
+{"__time": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
+{"__time": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}
+{"__time": "2013-09-01T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
+{"__time": "2013-09-01T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
+{"__time": "2013-09-01T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
+{"__time": "2013-09-01T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
+{"__time": "2013-09-01T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 0904bd1..76540b7 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -24,6 +24,7 @@ import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -32,6 +33,16 @@ import com.metamx.common.lifecycle.Lifecycle;
 import com.metamx.http.client.HttpClient;
 import com.metamx.http.client.HttpClientConfig;
 import com.metamx.http.client.HttpClientInit;
+import com.metamx.http.client.Request;
+import com.metamx.http.client.response.StatusResponseHandler;
+import com.metamx.http.client.response.StatusResponseHolder;
+import io.druid.data.input.impl.DimensionSchema;
+import io.druid.data.input.impl.DimensionsSpec;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.data.input.impl.JSONParseSpec;
+import io.druid.data.input.impl.StringInputRowParser;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.java.util.common.Pair;
 import io.druid.metadata.MetadataStorageConnectorConfig;
 import io.druid.metadata.MetadataStorageTablesConfig;
 import io.druid.metadata.SQLMetadataConnector;
@@ -39,6 +50,10 @@ import io.druid.metadata.storage.derby.DerbyConnector;
 import io.druid.metadata.storage.derby.DerbyMetadataStorage;
 import io.druid.metadata.storage.mysql.MySQLConnector;
 import io.druid.metadata.storage.postgresql.PostgreSQLConnector;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.segment.IndexSpec;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.granularity.GranularitySpec;
 import io.druid.segment.loading.DataSegmentPusher;
 import io.druid.segment.loading.SegmentLoadingException;
 import io.druid.storage.hdfs.HdfsDataSegmentPusher;
@@ -53,12 +68,16 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.io.DruidOutputFormat;
 import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
 import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
+import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig;
+import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec;
+import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig;
 import org.apache.hadoop.hive.druid.security.KerberosHttpClient;
 import org.apache.hadoop.hive.druid.serde.DruidSerDe;
 import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -69,11 +88,16 @@ import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorization
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.common.util.ShutdownHookManager;
+
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
 import org.joda.time.Period;
 import org.skife.jdbi.v2.exceptions.CallbackFailedException;
@@ -83,9 +107,13 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -254,9 +282,242 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
       // For external tables, we do not need to do anything else
       return;
     }
+    if(isKafkaStreamingTable(table)){
+      updateKafkaIngestion(table);
+    }
     loadDruidSegments(table, true);
   }
 
+  private void updateKafkaIngestion(Table table){
+    final String overlordAddress = HiveConf
+        .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS);
+
+    final String dataSourceName = Preconditions.checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid datasource name is null");
+
+    final String kafkaTopic = Preconditions.checkNotNull(getTableProperty(table, Constants.KAFKA_TOPIC), "kafka topic is null");
+    final String kafka_servers = Preconditions.checkNotNull(getTableProperty(table, Constants.KAFKA_BOOTSTRAP_SERVERS), "kafka connect string is null");
+
+    Properties tableProperties = new Properties();
+    tableProperties.putAll(table.getParameters());
+
+    final GranularitySpec granularitySpec = DruidStorageHandlerUtils.getGranularitySpec(getConf(), tableProperties);
+
+    List<FieldSchema> columns = table.getSd().getCols();
+    List<String> columnNames = new ArrayList<>(columns.size());
+    List<TypeInfo> columnTypes = new ArrayList<>(columns.size());
+
+    for(FieldSchema schema: columns) {
+      columnNames.add(schema.getName());
+      columnTypes.add(TypeInfoUtils.getTypeInfoFromTypeString(schema.getType()));
+    }
+
+    Pair<List<DimensionSchema>, AggregatorFactory[]> dimensionsAndAggregates = DruidStorageHandlerUtils
+        .getDimensionsAndAggregates(getConf(), columnNames, columnTypes);
+    if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
+      throw new IllegalStateException(
+          "Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN +
+              "') not specified in create table; list of columns is : " +
+              columnNames);
+    }
+
+    final InputRowParser inputRowParser = new StringInputRowParser(
+        new JSONParseSpec(
+            new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
+            new DimensionsSpec(dimensionsAndAggregates.lhs, null, null),
+            null,
+            null
+        ), "UTF-8");
+
+    Map<String, Object> inputParser = DruidStorageHandlerUtils.JSON_MAPPER
+        .convertValue(inputRowParser, Map.class);
+    final DataSchema dataSchema = new DataSchema(
+        dataSourceName,
+        inputParser,
+        dimensionsAndAggregates.rhs,
+        granularitySpec,
+        DruidStorageHandlerUtils.JSON_MAPPER
+    );
+
+    IndexSpec indexSpec = DruidStorageHandlerUtils.getIndexSpec(getConf());
+
+    KafkaSupervisorSpec spec = createKafkaSupervisorSpec(table, kafkaTopic, kafka_servers,
+        dataSchema, indexSpec);
+
+    // Fetch existing Ingestion Spec from Druid, if any
+    KafkaSupervisorSpec existingSpec = fetchKafkaIngestionSpec(table);
+    String targetState = getTableProperty(table, Constants.DRUID_KAFKA_INGESTION);
+    if(targetState == null){
+      // Case when user has not specified any ingestion state in the current command
+      // if there is a kafka supervisor running then keep it last known state is START otherwise STOP.
+      targetState = existingSpec == null ? "STOP" : "START";
+    }
+
+    if(targetState.equalsIgnoreCase("STOP")){
+      if(existingSpec != null){
+        stopKafkaIngestion(overlordAddress, dataSourceName);
+      }
+    } else if(targetState.equalsIgnoreCase("START")){
+      if(existingSpec == null || !existingSpec.equals(spec)){
+        updateKafkaIngestionSpec(overlordAddress, spec);
+      }
+    } else if(targetState.equalsIgnoreCase("RESET")){
+      // Case when there are changes in multiple table properties.
+      if(existingSpec != null && !existingSpec.equals(spec)){
+        updateKafkaIngestionSpec(overlordAddress, spec);
+      }
+      resetKafkaIngestion(overlordAddress, dataSourceName);
+    } else {
+      throw new IllegalArgumentException(String.format("Invalid value for property [%s], Valid values are [START, STOP, RESET]", Constants.DRUID_KAFKA_INGESTION));
+    }
+    // We do not want to keep state in two separate places so remove from hive table properties.
+    table.getParameters().remove(Constants.DRUID_KAFKA_INGESTION);
+  }
+
+  private static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, String kafkaTopic,
+      String kafka_servers, DataSchema dataSchema, IndexSpec indexSpec) {
+    return new KafkaSupervisorSpec(dataSchema,
+          new KafkaSupervisorTuningConfig(
+              getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsInMemory"),
+              getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"),
+              getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"),
+              null, // basePersistDirectory - use druid default, no need to be configured by user
+              getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"),
+              indexSpec,
+              null, // buildV9Directly - use druid default, no need to be configured by user
+              getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "reportParseExceptions"),
+              getLongProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "handoffConditionTimeout"),
+              getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "resetOffsetAutomatically"),
+              getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"),
+              getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatThreads"),
+              getLongProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatRetries"),
+              getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "httpTimeout"),
+              getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "shutdownTimeout"),
+              getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "offsetFetchPeriod")),
+          new KafkaSupervisorIOConfig(kafkaTopic, // Mandatory Property
+              getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "replicas"),
+              getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskCount"),
+              getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskDuration"),
+              ImmutableMap.of(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY,
+                  kafka_servers), // Mandatory Property
+              getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "startDelay"),
+              getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "period"),
+              getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "useEarliestOffset"),
+              getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "completionTimeout"),
+              getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "lateMessageRejectionPeriod"),
+              getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "earlyMessageRejectionPeriod"),
+              getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "skipOffsetGaps")),
+          new HashMap<String, Object>()
+      );
+  }
+
+  private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec spec) {
+    try {
+      String task = DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(spec);
+      console.printInfo("submitting kafka Spec {}", task);
+      LOG.info("submitting kafka Supervisor Spec {}", task);
+
+      StatusResponseHolder response = getHttpClient().go(new Request(HttpMethod.POST,
+              new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress)))
+              .setContent(
+                  "application/json",
+                  DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsBytes(spec)),
+          new StatusResponseHandler(
+              Charset.forName("UTF-8"))).get();
+      if (response.getStatus().equals(HttpResponseStatus.OK)) {
+        String msg = String.format("Kafka Supervisor for [%s] Submitted Successfully to druid.", spec.getDataSchema().getDataSource());
+        LOG.info(msg);
+        console.printInfo(msg);
+      } else {
+        throw new IOException(String
+            .format("Unable to update Kafka Ingestion for Druid status [%d] full response [%s]",
+                response.getStatus().getCode(), response.getContent()));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void resetKafkaIngestion(String overlordAddress, String dataSourceName) {
+    try {
+      StatusResponseHolder response = RetryUtils
+          .retry(() -> getHttpClient().go(new Request(HttpMethod.POST,
+                  new URL(String
+                      .format("http://%s/druid/indexer/v1/supervisor/%s/reset", overlordAddress,
+                          dataSourceName))),
+              new StatusResponseHandler(
+                  Charset.forName("UTF-8"))).get(),
+              input -> input instanceof IOException,
+              getMaxRetryCount());
+      if (response.getStatus().equals(HttpResponseStatus.OK)) {
+        console.printInfo("Druid Kafka Ingestion Reset successful.");
+      } else {
+        throw new IOException(String
+            .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]",
+                response.getStatus().getCode(), response.getContent()));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void stopKafkaIngestion(String overlordAddress, String dataSourceName) {
+    try {
+      StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient()
+              .go(new Request(HttpMethod.POST,
+                      new URL(String
+                          .format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", overlordAddress,
+                              dataSourceName))),
+                  new StatusResponseHandler(
+                      Charset.forName("UTF-8"))).get(),
+          input -> input instanceof IOException,
+          getMaxRetryCount());
+      if (response.getStatus().equals(HttpResponseStatus.OK)) {
+        console.printInfo("Druid Kafka Ingestion shutdown successful.");
+      } else {
+        throw new IOException(String
+            .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]",
+                response.getStatus().getCode(), response.getContent()));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) {
+    // Stop Kafka Ingestion first
+    final String overlordAddress = Preconditions.checkNotNull(HiveConf
+            .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS),
+        "Druid Overlord Address is null");
+    String dataSourceName = Preconditions
+        .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE),
+            "Druid Datasource name is null");
+    try {
+      StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient().go(new Request(HttpMethod.GET,
+              new URL(String
+                  .format("http://%s/druid/indexer/v1/supervisor/%s", overlordAddress,
+                      dataSourceName))),
+          new StatusResponseHandler(
+              Charset.forName("UTF-8"))).get(),
+          input -> input instanceof IOException,
+          getMaxRetryCount());
+      if (response.getStatus().equals(HttpResponseStatus.OK)) {
+        return DruidStorageHandlerUtils.JSON_MAPPER
+            .readValue(response.getContent(), KafkaSupervisorSpec.class);
+        // Druid Returns 400 Bad Request when not found.
+      } else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) {
+        LOG.debug("No Kafka Supervisor found for datasource[%s]", dataSourceName);
+        return null;
+      } else {
+        throw new IOException(String
+            .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]",
+                response.getStatus().getCode(), response.getContent()));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Exception while fetching kafka ingestion spec from druid", e);
+    }
+  }
+
 
   protected void loadDruidSegments(Table table, boolean overwrite) throws MetaException {
     // at this point we have Druid segments from reducers but we need to atomically
@@ -323,7 +584,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
   private int checkLoadStatus(List<DataSegment> segments){
     final String coordinatorAddress = HiveConf
             .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS);
-    int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
+    int maxTries = getMaxRetryCount();
     if (maxTries == 0) {
       return segments.size();
     }
@@ -457,6 +718,16 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
     if (MetaStoreUtils.isExternalTable(table)) {
       return;
     }
+    if(isKafkaStreamingTable(table)) {
+      // Stop Kafka Ingestion first
+      final String overlordAddress = Preconditions.checkNotNull(HiveConf
+              .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS),
+          "Druid Overlord Address is null");
+      String dataSourceName = Preconditions
+          .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE),
+              "Druid Datasource name is null");
+      stopKafkaIngestion(overlordAddress, dataSourceName);
+    }
     String dataSourceName = Preconditions
             .checkNotNull(table.getParameters().get(Constants.DRUID_DATA_SOURCE),
                     "DataSource name is null !"
@@ -688,11 +959,77 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
 
   @Override
   public void preAlterTable(Table table, EnvironmentContext context) throws MetaException {
-    String alterOpType = context == null ? null : context.getProperties().get(ALTER_TABLE_OPERATION_TYPE);
+    String alterOpType =
+        context == null ? null : context.getProperties().get(ALTER_TABLE_OPERATION_TYPE);
     // alterOpType is null in case of stats update
     if (alterOpType != null && !allowedAlterTypes.contains(alterOpType)) {
       throw new MetaException(
           "ALTER TABLE can not be used for " + alterOpType + " to a non-native table ");
     }
+    if(isKafkaStreamingTable(table)){
+      updateKafkaIngestion(table);
+    }
+  }
+  private static <T> Boolean getBooleanProperty(Table table, String propertyName) {
+    String val = getTableProperty(table, propertyName);
+    if (val == null) {
+      return null;
+    }
+    return Boolean.parseBoolean(val);
+  }
+
+  private static <T> Integer getIntegerProperty(Table table, String propertyName) {
+    String val = getTableProperty(table, propertyName);
+    if (val == null) {
+      return null;
+    }
+    try {
+      return Integer.parseInt(val);
+    } catch (NumberFormatException e) {
+      throw new NumberFormatException(String
+          .format("Exception while parsing property[%s] with Value [%s] as Integer", propertyName,
+              val));
+    }
+  }
+
+  private static <T> Long getLongProperty(Table table, String propertyName) {
+    String val = getTableProperty(table, propertyName);
+    if (val == null) {
+      return null;
+    }
+    try {
+      return Long.parseLong(val);
+    } catch (NumberFormatException e) {
+      throw new NumberFormatException(String
+          .format("Exception while parsing property[%s] with Value [%s] as Long", propertyName,
+              val));
+    }
+  }
+
+  private static <T> Period getPeriodProperty(Table table, String propertyName) {
+    String val = getTableProperty(table, propertyName);
+    if (val == null) {
+      return null;
+    }
+    try {
+      return Period.parse(val);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String
+          .format("Exception while parsing property[%s] with Value [%s] as Period", propertyName,
+              val));
+    }
+  }
+
+  private static String getTableProperty(Table table, String propertyName) {
+    return table.getParameters().get(propertyName);
+  }
+
+  private static boolean isKafkaStreamingTable(Table table){
+    // For kafka Streaming tables it is mandatory to set a kafka topic.
+    return getTableProperty(table, Constants.KAFKA_TOPIC) != null;
+  }
+
+  private int getMaxRetryCount() {
+    return HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 233b288..1424237 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -17,20 +17,11 @@
  */
 package org.apache.hadoop.hive.druid;
 
-import com.fasterxml.jackson.databind.InjectableValues;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Interner;
-import com.google.common.collect.Interners;
-import com.metamx.common.JodaUtils;
-import com.metamx.emitter.EmittingLogger;
-import com.metamx.emitter.core.NoopEmitter;
-import com.metamx.emitter.service.ServiceEmitter;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.response.InputStreamResponseHandler;
+import io.druid.data.input.impl.DimensionSchema;
+import io.druid.data.input.impl.StringDimensionSchema;
 import io.druid.jackson.DefaultObjectMapper;
+import io.druid.java.util.common.Pair;
+import io.druid.java.util.common.granularity.Granularity;
 import io.druid.math.expr.ExprMacroTable;
 import io.druid.metadata.MetadataStorageTablesConfig;
 import io.druid.metadata.SQLMetadataConnector;
@@ -47,6 +38,14 @@ import io.druid.query.expression.TrimExprMacro;
 import io.druid.query.select.SelectQueryConfig;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMergerV9;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.DoubleSumAggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.segment.IndexSpec;
+import io.druid.segment.data.ConciseBitmapSerdeFactory;
+import io.druid.segment.data.RoaringBitmapSerdeFactory;
+import io.druid.segment.indexing.granularity.GranularitySpec;
+import io.druid.segment.indexing.granularity.UniformGranularitySpec;
 import io.druid.segment.loading.DataSegmentPusher;
 import io.druid.segment.realtime.appenderator.SegmentIdentifier;
 import io.druid.storage.hdfs.HdfsDataSegmentPusher;
@@ -64,33 +63,50 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.serde.HiveDruidSerializationModule;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.util.StringUtils;
 
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 import com.google.common.io.CharStreams;
+import com.metamx.common.JodaUtils;
 import com.metamx.common.MapUtils;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.core.NoopEmitter;
+import com.metamx.emitter.service.ServiceEmitter;
+import com.metamx.http.client.HttpClient;
 import com.metamx.http.client.Request;
+import com.metamx.http.client.response.InputStreamResponseHandler;
 
 import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.chrono.ISOChronology;
-import org.skife.jdbi.v2.FoldController;
 import org.skife.jdbi.v2.Folder3;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.PreparedBatch;
 import org.skife.jdbi.v2.Query;
 import org.skife.jdbi.v2.ResultIterator;
-import org.skife.jdbi.v2.StatementContext;
 import org.skife.jdbi.v2.exceptions.CallbackFailedException;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.skife.jdbi.v2.util.ByteArrayMapper;
@@ -113,6 +129,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.ExecutionException;
@@ -169,7 +186,9 @@ public final class DruidStorageHandlerUtils {
                 new TrimExprMacro.BothTrimExprMacro(),
                 new TrimExprMacro.LeftTrimExprMacro(),
                 new TrimExprMacro.RightTrimExprMacro()
-            )));
+            )))
+            .addValue(ObjectMapper.class, JSON_MAPPER);
+
     JSON_MAPPER.setInjectableValues(injectableValues);
     SMILE_MAPPER.setInjectableValues(injectableValues);
     HiveDruidSerializationModule hiveDruidSerializationModule = new HiveDruidSerializationModule();
@@ -177,6 +196,7 @@ public final class DruidStorageHandlerUtils {
     SMILE_MAPPER.registerModule(hiveDruidSerializationModule);
     // Register the shard sub type to be used by the mapper
     JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, "linear"));
+    JSON_MAPPER.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered"));
     // set the timezone of the object mapper
     // THIS IS NOT WORKING workaround is to set it as part of java opts -Duser.timezone="UTC"
     JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -755,4 +775,100 @@ public final class DruidStorageHandlerUtils {
   public static Path getPath(DataSegment dataSegment) {
     return new Path(String.valueOf(dataSegment.getLoadSpec().get("path")));
   }
+
+  public static GranularitySpec getGranularitySpec(Configuration configuration, Properties tableProperties) {
+    final String segmentGranularity =
+        tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) != null ?
+            tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) :
+            HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY);
+    return new UniformGranularitySpec(
+        Granularity.fromString(segmentGranularity),
+        Granularity.fromString(
+            tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY) == null
+                ? "NONE"
+                : tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY)),
+        null
+    );
+  }
+
+  public static IndexSpec getIndexSpec(Configuration jc) {
+    IndexSpec indexSpec;
+    if ("concise".equals(HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BITMAP_FACTORY_TYPE))) {
+      indexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null);
+    } else {
+      indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null);
+    }
+    return indexSpec;
+  }
+
+  public static Pair<List<DimensionSchema>, AggregatorFactory[]> getDimensionsAndAggregates(Configuration jc, List<String> columnNames,
+      List<TypeInfo> columnTypes) {
+    // Default, all columns that are not metrics or timestamp, are treated as dimensions
+    final List<DimensionSchema> dimensions = new ArrayList<>();
+    ImmutableList.Builder<AggregatorFactory> aggregatorFactoryBuilder = ImmutableList.builder();
+    final boolean approximationAllowed = HiveConf
+        .getBoolVar(jc, HiveConf.ConfVars.HIVE_DRUID_APPROX_RESULT);
+    for (int i = 0; i < columnTypes.size(); i++) {
+      final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) columnTypes
+          .get(i)).getPrimitiveCategory();
+      AggregatorFactory af;
+      switch (primitiveCategory) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        af = new LongSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
+        break;
+      case FLOAT:
+      case DOUBLE:
+        af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
+        break;
+      case DECIMAL:
+        if (approximationAllowed) {
+          af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
+        } else {
+          throw new UnsupportedOperationException(
+              String.format("Druid does not support decimal column type." +
+                      "Either cast column [%s] to double or Enable Approximate Result for Druid by setting property [%s] to true",
+                  columnNames.get(i), HiveConf.ConfVars.HIVE_DRUID_APPROX_RESULT.varname));
+        }
+        break;
+      case TIMESTAMP:
+        // Granularity column
+        String tColumnName = columnNames.get(i);
+        if (!tColumnName.equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME) &&
+            !tColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
+          throw new IllegalArgumentException(
+              "Dimension " + tColumnName + " does not have STRING type: " +
+                  primitiveCategory);
+        }
+        continue;
+      case TIMESTAMPLOCALTZ:
+        // Druid timestamp column
+        String tLocalTZColumnName = columnNames.get(i);
+        if (!tLocalTZColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
+          throw new IllegalArgumentException(
+              "Dimension " + tLocalTZColumnName + " does not have STRING type: " +
+                  primitiveCategory);
+        }
+        continue;
+      default:
+        // Dimension
+        String dColumnName = columnNames.get(i);
+        if (PrimitiveObjectInspectorUtils.getPrimitiveGrouping(primitiveCategory) !=
+            PrimitiveObjectInspectorUtils.PrimitiveGrouping.STRING_GROUP
+            && primitiveCategory != PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN) {
+          throw new IllegalArgumentException(
+              "Dimension " + dColumnName + " does not have STRING type: " +
+                  primitiveCategory);
+        }
+        dimensions.add(new StringDimensionSchema(dColumnName));
+        continue;
+      }
+      aggregatorFactoryBuilder.add(af);
+    }
+    ImmutableList<AggregatorFactory> aggregatorFactories = aggregatorFactoryBuilder.build();
+    return Pair.of(dimensions,
+        aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()]));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
index b758efd..15a08eb 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -17,28 +17,18 @@
  */
 package org.apache.hadoop.hive.druid.io;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import io.druid.data.input.impl.DimensionSchema;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.InputRowParser;
 import io.druid.data.input.impl.MapInputRowParser;
-import io.druid.data.input.impl.StringDimensionSchema;
 import io.druid.data.input.impl.TimeAndDimsParseSpec;
 import io.druid.data.input.impl.TimestampSpec;
-import io.druid.java.util.common.granularity.Granularity;
+import io.druid.java.util.common.Pair;
 import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.query.aggregation.DoubleSumAggregatorFactory;
-import io.druid.query.aggregation.LongSumAggregatorFactory;
 import io.druid.segment.IndexSpec;
-import io.druid.segment.data.ConciseBitmapSerdeFactory;
-import io.druid.segment.data.RoaringBitmapSerdeFactory;
 import io.druid.segment.indexing.DataSchema;
 import io.druid.segment.indexing.RealtimeTuningConfig;
 import io.druid.segment.indexing.granularity.GranularitySpec;
-import io.druid.segment.indexing.granularity.UniformGranularitySpec;
 import io.druid.segment.realtime.plumber.CustomVersioningPolicy;
 
 import org.apache.commons.lang.StringUtils;
@@ -51,10 +41,6 @@ import org.apache.hadoop.hive.druid.serde.DruidWritable;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveGrouping;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.Writable;
@@ -62,6 +48,12 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.util.Progressable;
 
+import static org.apache.hadoop.hive.druid.DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,8 +64,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.hadoop.hive.druid.DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME;
-
 public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritable> {
 
   protected static final Logger LOG = LoggerFactory.getLogger(DruidOutputFormat.class);
@@ -88,10 +78,7 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl
           Progressable progress
   ) throws IOException {
 
-    final String segmentGranularity =
-            tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) != null ?
-                    tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) :
-                    HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY);
+
     final int targetNumShardsPerGranularity = Integer.parseUnsignedInt(
         tableProperties.getProperty(Constants.DRUID_TARGET_SHARDS_PER_GRANULARITY, "0"));
     final int maxPartitionSize = targetNumShardsPerGranularity > 0 ? -1 : HiveConf
@@ -104,14 +91,7 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl
         : tableProperties.getProperty(Constants.DRUID_DATA_SOURCE);
     final String segmentDirectory = jc.get(Constants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY);
 
-    final GranularitySpec granularitySpec = new UniformGranularitySpec(
-            Granularity.fromString(segmentGranularity),
-            Granularity.fromString(
-                    tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY) == null
-                            ? "NONE"
-                            : tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY)),
-            null
-    );
+    final GranularitySpec granularitySpec = DruidStorageHandlerUtils.getGranularitySpec(jc, tableProperties);
 
     final String columnNameProperty = tableProperties.getProperty(serdeConstants.LIST_COLUMNS);
     final String columnTypeProperty = tableProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
@@ -122,10 +102,7 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl
                       columnNameProperty, columnTypeProperty
               ));
     }
-    ArrayList<String> columnNames = new ArrayList<String>();
-    for (String name : columnNameProperty.split(",")) {
-      columnNames.add(name);
-    }
+    ArrayList<String> columnNames = Lists.newArrayList(columnNameProperty.split(","));
     if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
       throw new IllegalStateException("Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN +
               "') not specified in create table; list of columns is : " +
@@ -133,69 +110,11 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl
     }
     ArrayList<TypeInfo> columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
 
-    final boolean approximationAllowed = HiveConf.getBoolVar(jc, HiveConf.ConfVars.HIVE_DRUID_APPROX_RESULT);
-    // Default, all columns that are not metrics or timestamp, are treated as dimensions
-    final List<DimensionSchema> dimensions = new ArrayList<>();
-    ImmutableList.Builder<AggregatorFactory> aggregatorFactoryBuilder = ImmutableList.builder();
-    for (int i = 0; i < columnTypes.size(); i++) {
-      final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) columnTypes
-              .get(i)).getPrimitiveCategory();
-      AggregatorFactory af;
-      switch (primitiveCategory) {
-        case BYTE:
-        case SHORT:
-        case INT:
-        case LONG:
-          af = new LongSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
-          break;
-        case FLOAT:
-        case DOUBLE:
-          af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
-          break;
-        case DECIMAL:
-          if (approximationAllowed) {
-            af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i));
-          } else {
-            throw new UnsupportedOperationException(
-                String.format("Druid does not support decimal column type." +
-                        "Either cast column [%s] to double or Enable Approximate Result for Druid by setting property [%s] to true",
-                    columnNames.get(i), HiveConf.ConfVars.HIVE_DRUID_APPROX_RESULT.varname));
-          }
-          break;
-        case TIMESTAMP:
-          // Granularity column
-          String tColumnName = columnNames.get(i);
-          if (!tColumnName.equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME)) {
-            throw new IOException("Dimension " + tColumnName + " does not have STRING type: " +
-                    primitiveCategory);
-          }
-          continue;
-        case TIMESTAMPLOCALTZ:
-          // Druid timestamp column
-          String tLocalTZColumnName = columnNames.get(i);
-          if (!tLocalTZColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) {
-            throw new IOException("Dimension " + tLocalTZColumnName + " does not have STRING type: " +
-                    primitiveCategory);
-          }
-          continue;
-        default:
-          // Dimension
-          String dColumnName = columnNames.get(i);
-          if (PrimitiveObjectInspectorUtils.getPrimitiveGrouping(primitiveCategory) !=
-                  PrimitiveGrouping.STRING_GROUP
-                  && primitiveCategory != PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN) {
-            throw new IOException("Dimension " + dColumnName + " does not have STRING type: " +
-                    primitiveCategory);
-          }
-          dimensions.add(new StringDimensionSchema(dColumnName));
-          continue;
-      }
-      aggregatorFactoryBuilder.add(af);
-    }
-    List<AggregatorFactory> aggregatorFactories = aggregatorFactoryBuilder.build();
+    Pair<List<DimensionSchema>, AggregatorFactory[]> dimensionsAndAggregates = DruidStorageHandlerUtils
+        .getDimensionsAndAggregates(jc, columnNames, columnTypes);
     final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec(
             new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
-            new DimensionsSpec(dimensions, Lists
+            new DimensionsSpec(dimensionsAndAggregates.lhs, Lists
                 .newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
                     Constants.DRUID_SHARD_KEY_COL_NAME
                 ), null
@@ -208,7 +127,7 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl
     final DataSchema dataSchema = new DataSchema(
             Preconditions.checkNotNull(dataSource, "Data source name is null"),
             inputParser,
-            aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()]),
+            dimensionsAndAggregates.rhs,
             granularitySpec,
             DruidStorageHandlerUtils.JSON_MAPPER
     );
@@ -222,12 +141,7 @@ public class DruidOutputFormat<K, V> implements HiveOutputFormat<K, DruidWritabl
     }
     Integer maxRowInMemory = HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_ROW_IN_MEMORY);
 
-    IndexSpec indexSpec;
-    if ("concise".equals(HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BITMAP_FACTORY_TYPE))) {
-      indexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null);
-    } else {
-      indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null);
-    }
+    IndexSpec indexSpec = DruidStorageHandlerUtils.getIndexSpec(jc);
     RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(maxRowInMemory,
             null,
             null,

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java
new file mode 100644
index 0000000..425a5bb
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.json;
+
+import io.druid.java.util.common.StringUtils;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import java.util.Map;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public class KafkaSupervisorIOConfig
+{
+  public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
+
+  private final String topic;
+  private final Integer replicas;
+  private final Integer taskCount;
+  private final Duration taskDuration;
+  private final Map<String, String> consumerProperties;
+  private final Duration startDelay;
+  private final Duration period;
+  private final boolean useEarliestOffset;
+  private final Duration completionTimeout;
+  private final Optional<Duration> lateMessageRejectionPeriod;
+  private final Optional<Duration> earlyMessageRejectionPeriod;
+  private final boolean skipOffsetGaps;
+
+  @JsonCreator
+  public KafkaSupervisorIOConfig(
+      @JsonProperty("topic") String topic,
+      @JsonProperty("replicas") Integer replicas,
+      @JsonProperty("taskCount") Integer taskCount,
+      @JsonProperty("taskDuration") Period taskDuration,
+      @JsonProperty("consumerProperties") Map<String, String> consumerProperties,
+      @JsonProperty("startDelay") Period startDelay,
+      @JsonProperty("period") Period period,
+      @JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
+      @JsonProperty("completionTimeout") Period completionTimeout,
+      @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod,
+      @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
+      @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
+  )
+  {
+    this.topic = Preconditions.checkNotNull(topic, "topic");
+    this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
+    Preconditions.checkNotNull(
+        consumerProperties.get(BOOTSTRAP_SERVERS_KEY),
+        StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY)
+    );
+
+    this.replicas = replicas != null ? replicas : 1;
+    this.taskCount = taskCount != null ? taskCount : 1;
+    this.taskDuration = defaultDuration(taskDuration, "PT1H");
+    this.startDelay = defaultDuration(startDelay, "PT5S");
+    this.period = defaultDuration(period, "PT30S");
+    this.useEarliestOffset = useEarliestOffset != null ? useEarliestOffset : false;
+    this.completionTimeout = defaultDuration(completionTimeout, "PT30M");
+    this.lateMessageRejectionPeriod = lateMessageRejectionPeriod == null
+                                      ? Optional.<Duration>absent()
+                                      : Optional.of(lateMessageRejectionPeriod.toStandardDuration());
+    this.earlyMessageRejectionPeriod = earlyMessageRejectionPeriod == null
+                                       ? Optional.<Duration>absent()
+                                       : Optional.of(earlyMessageRejectionPeriod.toStandardDuration());
+    this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false;
+  }
+
+  @JsonProperty
+  public String getTopic()
+  {
+    return topic;
+  }
+
+  @JsonProperty
+  public Integer getReplicas()
+  {
+    return replicas;
+  }
+
+  @JsonProperty
+  public Integer getTaskCount()
+  {
+    return taskCount;
+  }
+
+  @JsonProperty
+  public Duration getTaskDuration()
+  {
+    return taskDuration;
+  }
+
+  @JsonProperty
+  public Map<String, String> getConsumerProperties()
+  {
+    return consumerProperties;
+  }
+
+  @JsonProperty
+  public Duration getStartDelay()
+  {
+    return startDelay;
+  }
+
+  @JsonProperty
+  public Duration getPeriod()
+  {
+    return period;
+  }
+
+  @JsonProperty
+  public boolean isUseEarliestOffset()
+  {
+    return useEarliestOffset;
+  }
+
+  @JsonProperty
+  public Duration getCompletionTimeout()
+  {
+    return completionTimeout;
+  }
+
+  @JsonProperty
+  public Optional<Duration> getEarlyMessageRejectionPeriod()
+  {
+    return earlyMessageRejectionPeriod;
+  }
+
+  @JsonProperty
+  public Optional<Duration> getLateMessageRejectionPeriod()
+  {
+    return lateMessageRejectionPeriod;
+  }
+
+  @JsonProperty
+  public boolean isSkipOffsetGaps()
+  {
+    return skipOffsetGaps;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "KafkaSupervisorIOConfig{" +
+           "topic='" + topic + '\'' +
+           ", replicas=" + replicas +
+           ", taskCount=" + taskCount +
+           ", taskDuration=" + taskDuration +
+           ", consumerProperties=" + consumerProperties +
+           ", startDelay=" + startDelay +
+           ", period=" + period +
+           ", useEarliestOffset=" + useEarliestOffset +
+           ", completionTimeout=" + completionTimeout +
+           ", lateMessageRejectionPeriod=" + lateMessageRejectionPeriod +
+           ", skipOffsetGaps=" + skipOffsetGaps +
+           '}';
+  }
+
+  private static Duration defaultDuration(final Period period, final String theDefault)
+  {
+    return (period == null ? new Period(theDefault) : period).toStandardDuration();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (o == null || getClass() != o.getClass())
+      return false;
+
+    KafkaSupervisorIOConfig that = (KafkaSupervisorIOConfig) o;
+
+    if (useEarliestOffset != that.useEarliestOffset)
+      return false;
+    if (skipOffsetGaps != that.skipOffsetGaps)
+      return false;
+    if (topic != null ? !topic.equals(that.topic) : that.topic != null)
+      return false;
+    if (replicas != null ? !replicas.equals(that.replicas) : that.replicas != null)
+      return false;
+    if (taskCount != null ? !taskCount.equals(that.taskCount) : that.taskCount != null)
+      return false;
+    if (taskDuration != null ? !taskDuration.equals(that.taskDuration) : that.taskDuration != null)
+      return false;
+    if (consumerProperties != null ?
+        !consumerProperties.equals(that.consumerProperties) :
+        that.consumerProperties != null)
+      return false;
+    if (startDelay != null ? !startDelay.equals(that.startDelay) : that.startDelay != null)
+      return false;
+    if (period != null ? !period.equals(that.period) : that.period != null)
+      return false;
+    if (completionTimeout != null ?
+        !completionTimeout.equals(that.completionTimeout) :
+        that.completionTimeout != null)
+      return false;
+    if (lateMessageRejectionPeriod != null ?
+        !lateMessageRejectionPeriod.equals(that.lateMessageRejectionPeriod) :
+        that.lateMessageRejectionPeriod != null)
+      return false;
+    return earlyMessageRejectionPeriod != null ?
+        earlyMessageRejectionPeriod.equals(that.earlyMessageRejectionPeriod) :
+        that.earlyMessageRejectionPeriod == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = topic != null ? topic.hashCode() : 0;
+    result = 31 * result + (replicas != null ? replicas.hashCode() : 0);
+    result = 31 * result + (taskCount != null ? taskCount.hashCode() : 0);
+    result = 31 * result + (taskDuration != null ? taskDuration.hashCode() : 0);
+    result = 31 * result + (consumerProperties != null ? consumerProperties.hashCode() : 0);
+    result = 31 * result + (startDelay != null ? startDelay.hashCode() : 0);
+    result = 31 * result + (period != null ? period.hashCode() : 0);
+    result = 31 * result + (useEarliestOffset ? 1 : 0);
+    result = 31 * result + (completionTimeout != null ? completionTimeout.hashCode() : 0);
+    result = 31 * result +
+        (lateMessageRejectionPeriod != null ? lateMessageRejectionPeriod.hashCode() : 0);
+    result = 31 * result +
+        (earlyMessageRejectionPeriod != null ? earlyMessageRejectionPeriod.hashCode() : 0);
+    result = 31 * result + (skipOffsetGaps ? 1 : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java
new file mode 100644
index 0000000..081bc27
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import io.druid.segment.indexing.DataSchema;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.common.base.Preconditions;
+
+import java.util.Map;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+@JsonTypeInfo(
+    use = JsonTypeInfo.Id.NAME,
+    property = "type"
+)
+@JsonSubTypes({@JsonSubTypes.Type(
+    name = "kafka",
+    value = KafkaSupervisorSpec.class
+)})
+public class KafkaSupervisorSpec
+{
+  private final DataSchema dataSchema;
+  private final KafkaSupervisorTuningConfig tuningConfig;
+  private final KafkaSupervisorIOConfig ioConfig;
+  private final Map<String, Object> context;
+
+  @JsonCreator
+  public KafkaSupervisorSpec(
+      @JsonProperty("dataSchema") DataSchema dataSchema,
+      @JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig,
+      @JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig,
+      @JsonProperty("context") Map<String, Object> context
+  )
+  {
+    this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
+    this.tuningConfig = tuningConfig != null
+                        ? tuningConfig
+                        : new KafkaSupervisorTuningConfig(
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null
+                        );
+    this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
+    this.context = context;
+  }
+
+  @JsonProperty
+  public DataSchema getDataSchema()
+  {
+    return dataSchema;
+  }
+
+  @JsonProperty
+  public KafkaSupervisorTuningConfig getTuningConfig()
+  {
+    return tuningConfig;
+  }
+
+  @JsonProperty
+  public KafkaSupervisorIOConfig getIoConfig()
+  {
+    return ioConfig;
+  }
+
+  @JsonProperty
+  public Map<String, Object> getContext()
+  {
+    return context;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "KafkaSupervisorSpec{" +
+           "dataSchema=" + dataSchema +
+           ", tuningConfig=" + tuningConfig +
+           ", ioConfig=" + ioConfig +
+           '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (o == null || getClass() != o.getClass())
+      return false;
+
+    KafkaSupervisorSpec that = (KafkaSupervisorSpec) o;
+
+    if (dataSchema != null ? !dataSchema.equals(that.dataSchema) : that.dataSchema != null)
+      return false;
+    if (tuningConfig != null ? !tuningConfig.equals(that.tuningConfig) : that.tuningConfig != null)
+      return false;
+    if (ioConfig != null ? !ioConfig.equals(that.ioConfig) : that.ioConfig != null)
+      return false;
+    return context != null ? context.equals(that.context) : that.context == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = dataSchema != null ? dataSchema.hashCode() : 0;
+    result = 31 * result + (tuningConfig != null ? tuningConfig.hashCode() : 0);
+    result = 31 * result + (ioConfig != null ? ioConfig.hashCode() : 0);
+    result = 31 * result + (context != null ? context.hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java
new file mode 100644
index 0000000..a918df4
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import io.druid.segment.IndexSpec;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import java.io.File;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+@JsonTypeInfo(
+    use = JsonTypeInfo.Id.NAME,
+    property = "type"
+)
+@JsonSubTypes({@JsonSubTypes.Type(
+    name = "kafka",
+    value = KafkaSupervisorTuningConfig.class
+)})
+public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
+{
+  private final Integer workerThreads;
+  private final Integer chatThreads;
+  private final Long chatRetries;
+  private final Duration httpTimeout;
+  private final Duration shutdownTimeout;
+  private final Duration offsetFetchPeriod;
+
+  public KafkaSupervisorTuningConfig(
+      @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+      @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
+      @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
+      @JsonProperty("basePersistDirectory") File basePersistDirectory,
+      @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
+      @JsonProperty("indexSpec") IndexSpec indexSpec,
+      // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
+      @JsonProperty("buildV9Directly") Boolean buildV9Directly,
+      @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
+      @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, // for backward compatibility
+      @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
+      @JsonProperty("workerThreads") Integer workerThreads,
+      @JsonProperty("chatThreads") Integer chatThreads,
+      @JsonProperty("chatRetries") Long chatRetries,
+      @JsonProperty("httpTimeout") Period httpTimeout,
+      @JsonProperty("shutdownTimeout") Period shutdownTimeout,
+      @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod
+  )
+  {
+    super(
+        maxRowsInMemory,
+        maxRowsPerSegment,
+        intermediatePersistPeriod,
+        basePersistDirectory,
+        maxPendingPersists,
+        indexSpec,
+        true,
+        reportParseExceptions,
+        // Supervised kafka tasks should respect KafkaSupervisorIOConfig.completionTimeout instead of
+        // handoffConditionTimeout
+        handoffConditionTimeout,
+        resetOffsetAutomatically
+    );
+
+    this.workerThreads = workerThreads;
+    this.chatThreads = chatThreads;
+    this.chatRetries = (chatRetries != null ? chatRetries : 8);
+    this.httpTimeout = defaultDuration(httpTimeout, "PT10S");
+    this.shutdownTimeout = defaultDuration(shutdownTimeout, "PT80S");
+    this.offsetFetchPeriod = defaultDuration(offsetFetchPeriod, "PT30S");
+  }
+
+  @JsonProperty
+  public Integer getWorkerThreads()
+  {
+    return workerThreads;
+  }
+
+  @JsonProperty
+  public Integer getChatThreads()
+  {
+    return chatThreads;
+  }
+
+  @JsonProperty
+  public Long getChatRetries()
+  {
+    return chatRetries;
+  }
+
+  @JsonProperty
+  public Duration getHttpTimeout()
+  {
+    return httpTimeout;
+  }
+
+  @JsonProperty
+  public Duration getShutdownTimeout()
+  {
+    return shutdownTimeout;
+  }
+
+  @JsonProperty
+  public Duration getOffsetFetchPeriod()
+  {
+    return offsetFetchPeriod;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "KafkaSupervisorTuningConfig{" +
+        "maxRowsInMemory=" + getMaxRowsInMemory() +
+        ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
+        ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
+        ", basePersistDirectory=" + getBasePersistDirectory() +
+        ", maxPendingPersists=" + getMaxPendingPersists() +
+        ", indexSpec=" + getIndexSpec() +
+        ", reportParseExceptions=" + isReportParseExceptions() +
+        ", handoffConditionTimeout=" + getHandoffConditionTimeout() +
+        ", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
+        ", workerThreads=" + workerThreads +
+        ", chatThreads=" + chatThreads +
+        ", chatRetries=" + chatRetries +
+        ", httpTimeout=" + httpTimeout +
+        ", shutdownTimeout=" + shutdownTimeout +
+        ", offsetFetchPeriod=" + offsetFetchPeriod +
+        '}';
+  }
+
+  private static Duration defaultDuration(final Period period, final String theDefault)
+  {
+    return (period == null ? new Period(theDefault) : period).toStandardDuration();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (o == null || getClass() != o.getClass())
+      return false;
+    if (!super.equals(o))
+      return false;
+
+    KafkaSupervisorTuningConfig that = (KafkaSupervisorTuningConfig) o;
+
+    if (workerThreads != null ?
+        !workerThreads.equals(that.workerThreads) :
+        that.workerThreads != null)
+      return false;
+    if (chatThreads != null ? !chatThreads.equals(that.chatThreads) : that.chatThreads != null)
+      return false;
+    if (chatRetries != null ? !chatRetries.equals(that.chatRetries) : that.chatRetries != null)
+      return false;
+    if (httpTimeout != null ? !httpTimeout.equals(that.httpTimeout) : that.httpTimeout != null)
+      return false;
+    if (shutdownTimeout != null ?
+        !shutdownTimeout.equals(that.shutdownTimeout) :
+        that.shutdownTimeout != null)
+      return false;
+    return offsetFetchPeriod != null ?
+        offsetFetchPeriod.equals(that.offsetFetchPeriod) :
+        that.offsetFetchPeriod == null;
+  }
+  
+  @Override
+  public int hashCode() {
+    int result = super.hashCode();
+    result = 31 * result + (workerThreads != null ? workerThreads.hashCode() : 0);
+    result = 31 * result + (chatThreads != null ? chatThreads.hashCode() : 0);
+    result = 31 * result + (chatRetries != null ? chatRetries.hashCode() : 0);
+    result = 31 * result + (httpTimeout != null ? httpTimeout.hashCode() : 0);
+    result = 31 * result + (shutdownTimeout != null ? shutdownTimeout.hashCode() : 0);
+    result = 31 * result + (offsetFetchPeriod != null ? offsetFetchPeriod.hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
new file mode 100644
index 0000000..ea23ddd
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import io.druid.segment.IndexSpec;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.realtime.appenderator.AppenderatorConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import org.joda.time.Period;
+
+import java.io.File;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public class KafkaTuningConfig implements AppenderatorConfig
+{
+  private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
+  private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
+
+  private final int maxRowsInMemory;
+  private final int maxRowsPerSegment;
+  private final Period intermediatePersistPeriod;
+  private final File basePersistDirectory;
+  private final int maxPendingPersists;
+  private final IndexSpec indexSpec;
+  private final boolean reportParseExceptions;
+  @Deprecated
+  private final long handoffConditionTimeout;
+  private final boolean resetOffsetAutomatically;
+
+  @JsonCreator
+  public KafkaTuningConfig(
+      @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+      @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
+      @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
+      @JsonProperty("basePersistDirectory") File basePersistDirectory,
+      @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
+      @JsonProperty("indexSpec") IndexSpec indexSpec,
+      // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
+      @JsonProperty("buildV9Directly") Boolean buildV9Directly,
+      @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
+      @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
+      @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically
+  )
+  {
+    // Cannot be a static because default basePersistDirectory is unique per-instance
+    final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
+
+    this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
+    this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment;
+    this.intermediatePersistPeriod = intermediatePersistPeriod == null
+        ? defaults.getIntermediatePersistPeriod()
+        : intermediatePersistPeriod;
+    this.basePersistDirectory = defaults.getBasePersistDirectory();
+    this.maxPendingPersists = maxPendingPersists == null ? defaults.getMaxPendingPersists() : maxPendingPersists;
+    this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
+    this.reportParseExceptions = reportParseExceptions == null
+        ? defaults.isReportParseExceptions()
+        : reportParseExceptions;
+    this.handoffConditionTimeout = handoffConditionTimeout == null
+        ? defaults.getHandoffConditionTimeout()
+        : handoffConditionTimeout;
+    this.resetOffsetAutomatically = resetOffsetAutomatically == null
+        ? DEFAULT_RESET_OFFSET_AUTOMATICALLY
+        : resetOffsetAutomatically;
+  }
+
+  public static KafkaTuningConfig copyOf(KafkaTuningConfig config)
+  {
+    return new KafkaTuningConfig(
+        config.maxRowsInMemory,
+        config.maxRowsPerSegment,
+        config.intermediatePersistPeriod,
+        config.basePersistDirectory,
+        config.maxPendingPersists,
+        config.indexSpec,
+        true,
+        config.reportParseExceptions,
+        config.handoffConditionTimeout,
+        config.resetOffsetAutomatically
+    );
+  }
+
+  @Override
+  @JsonProperty
+  public int getMaxRowsInMemory()
+  {
+    return maxRowsInMemory;
+  }
+
+  @JsonProperty
+  public int getMaxRowsPerSegment()
+  {
+    return maxRowsPerSegment;
+  }
+
+  @Override
+  @JsonProperty
+  public Period getIntermediatePersistPeriod()
+  {
+    return intermediatePersistPeriod;
+  }
+
+  @Override
+  @JsonProperty
+  public File getBasePersistDirectory()
+  {
+    return basePersistDirectory;
+  }
+
+  @Override
+  @JsonProperty
+  public int getMaxPendingPersists()
+  {
+    return maxPendingPersists;
+  }
+
+  @Override
+  @JsonProperty
+  public IndexSpec getIndexSpec()
+  {
+    return indexSpec;
+  }
+
+  /**
+   * Always returns true, doesn't affect the version being built.
+   */
+  @Deprecated
+  @JsonProperty
+  public boolean getBuildV9Directly()
+  {
+    return true;
+  }
+
+  @Override
+  @JsonProperty
+  public boolean isReportParseExceptions()
+  {
+    return reportParseExceptions;
+  }
+
+  @Deprecated
+  @JsonProperty
+  public long getHandoffConditionTimeout()
+  {
+    return handoffConditionTimeout;
+  }
+
+  @JsonProperty
+  public boolean isResetOffsetAutomatically()
+  {
+    return resetOffsetAutomatically;
+  }
+
+  public KafkaTuningConfig withBasePersistDirectory(File dir)
+  {
+    return new KafkaTuningConfig(
+        maxRowsInMemory,
+        maxRowsPerSegment,
+        intermediatePersistPeriod,
+        dir,
+        maxPendingPersists,
+        indexSpec,
+        true,
+        reportParseExceptions,
+        handoffConditionTimeout,
+        resetOffsetAutomatically
+    );
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    KafkaTuningConfig that = (KafkaTuningConfig) o;
+
+    if (maxRowsInMemory != that.maxRowsInMemory) {
+      return false;
+    }
+    if (maxRowsPerSegment != that.maxRowsPerSegment) {
+      return false;
+    }
+    if (maxPendingPersists != that.maxPendingPersists) {
+      return false;
+    }
+    if (reportParseExceptions != that.reportParseExceptions) {
+      return false;
+    }
+    if (handoffConditionTimeout != that.handoffConditionTimeout) {
+      return false;
+    }
+    if (resetOffsetAutomatically != that.resetOffsetAutomatically) {
+      return false;
+    }
+    if (intermediatePersistPeriod != null
+        ? !intermediatePersistPeriod.equals(that.intermediatePersistPeriod)
+        : that.intermediatePersistPeriod != null) {
+      return false;
+    }
+    if (basePersistDirectory != null
+        ? !basePersistDirectory.equals(that.basePersistDirectory)
+        : that.basePersistDirectory != null) {
+      return false;
+    }
+    return indexSpec != null ? indexSpec.equals(that.indexSpec) : that.indexSpec == null;
+
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int result = maxRowsInMemory;
+    result = 31 * result + maxRowsPerSegment;
+    result = 31 * result + (intermediatePersistPeriod != null ? intermediatePersistPeriod.hashCode() : 0);
+    result = 31 * result + (basePersistDirectory != null ? basePersistDirectory.hashCode() : 0);
+    result = 31 * result + maxPendingPersists;
+    result = 31 * result + (indexSpec != null ? indexSpec.hashCode() : 0);
+    result = 31 * result + (reportParseExceptions ? 1 : 0);
+    result = 31 * result + (int) (handoffConditionTimeout ^ (handoffConditionTimeout >>> 32));
+    result = 31 * result + (resetOffsetAutomatically ? 1 : 0);
+    return result;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "KafkaTuningConfig{" +
+        "maxRowsInMemory=" + maxRowsInMemory +
+        ", maxRowsPerSegment=" + maxRowsPerSegment +
+        ", intermediatePersistPeriod=" + intermediatePersistPeriod +
+        ", basePersistDirectory=" + basePersistDirectory +
+        ", maxPendingPersists=" + maxPendingPersists +
+        ", indexSpec=" + indexSpec +
+        ", reportParseExceptions=" + reportParseExceptions +
+        ", handoffConditionTimeout=" + handoffConditionTimeout +
+        ", resetOffsetAutomatically=" + resetOffsetAutomatically +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d3fed078/itests/qtest-druid/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml
index 20b7ea9..2e19ce5 100644
--- a/itests/qtest-druid/pom.xml
+++ b/itests/qtest-druid/pom.xml
@@ -43,6 +43,7 @@
     <druid.derby.version>10.11.1.1</druid.derby.version>
     <druid.guava.version>16.0.1</druid.guava.version>
     <druid.guice.version>4.1.0</druid.guice.version>
+    <kafka.version>0.10.2.0</kafka.version>
   </properties>
       <dependencies>
         <dependency>
@@ -104,6 +105,11 @@
           </exclusions>
         </dependency>
         <dependency>
+          <groupId>io.druid.extensions</groupId>
+          <artifactId>druid-kafka-indexing-service</artifactId>
+          <version>${druid.version}</version>
+        </dependency>
+        <dependency>
           <groupId>org.apache.logging.log4j</groupId>
           <artifactId>log4j-api</artifactId>
           <version>${log4j2.version}</version>
@@ -196,6 +202,11 @@
           <version>${junit.version}</version>
           <scope>test</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka_2.11</artifactId>
+          <version>${kafka.version}</version>
+        </dependency>
       </dependencies>
   <build>
 


Mime
View raw message