Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D229D200C8F for ; Fri, 9 Jun 2017 12:43:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CEAF0160BC8; Fri, 9 Jun 2017 10:43:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CC92C160B9C for ; Fri, 9 Jun 2017 12:43:53 +0200 (CEST) Received: (qmail 50239 invoked by uid 500); 9 Jun 2017 10:43:53 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 50228 invoked by uid 99); 9 Jun 2017 10:43:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Jun 2017 10:43:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C7566DFFAB; Fri, 9 Jun 2017 10:43:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jcamacho@apache.org To: commits@hive.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-15571: Support Insert into for druid storage handler (Nishant Bangarwa, reviewed by Slim Bouguerra, Jesus Camacho Rodriguez) Date: Fri, 9 Jun 2017 10:43:52 +0000 (UTC) archived-at: Fri, 09 Jun 2017 10:43:56 -0000 Repository: hive Updated Branches: refs/heads/master 46e5ea9ec -> 4a8eaa571 HIVE-15571: Support Insert into for druid storage handler (Nishant Bangarwa, reviewed by Slim Bouguerra, Jesus Camacho Rodriguez) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4a8eaa57 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4a8eaa57 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4a8eaa57 Branch: refs/heads/master Commit: 4a8eaa57152c4f4802d136925d4f169e7befae81 Parents: 46e5ea9 Author: Nishant Bangarwa Authored: Fri Jun 9 11:24:24 2017 +0100 Committer: Jesus Camacho Rodriguez Committed: Fri Jun 9 11:43:22 2017 +0100 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/Constants.java | 2 + .../hadoop/hive/druid/DruidStorageHandler.java | 116 +++--- .../hive/druid/DruidStorageHandlerUtils.java | 362 ++++++++++++++---- .../hadoop/hive/druid/io/DruidOutputFormat.java | 18 +- .../hive/druid/TestDruidStorageHandler.java | 372 ++++++++++++++----- 5 files changed, 638 insertions(+), 232 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4a8eaa57/common/src/java/org/apache/hadoop/hive/conf/Constants.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 7695e02..794b697 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -36,6 +36,8 @@ public class Constants { public static final String DRUID_QUERY_TYPE = "druid.query.type"; public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; public static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory"; + public static final String DRUID_SEGMENT_INTERMEDIATE_DIRECTORY = "druid.storage.storageDirectory.intermediate"; + public static final String DRUID_SEGMENT_VERSION = "druid.segment.version"; public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory"; http://git-wip-us.apache.org/repos/asf/hive/blob/4a8eaa57/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 4ed4df1..05aab4a 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -17,22 +17,6 @@ */ package org.apache.hadoop.hive.druid; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Strings; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.base.Throwables; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; -import com.metamx.common.RetryUtils; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; @@ -40,6 +24,7 @@ import io.druid.metadata.storage.mysql.MySQLConnector; import io.druid.metadata.storage.postgresql.PostgreSQLConnector; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -66,13 +51,30 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hive.common.util.ShutdownHookManager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Strings; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.metamx.common.RetryUtils; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; + import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Period; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -81,6 +83,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import javax.annotation.Nullable; + /** * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. */ @@ -92,7 +96,11 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor protected static final SessionState.LogHelper console = new SessionState.LogHelper(LOG); public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir"; + + public static final String INTERMEDIATE_SEGMENT_DIR_NAME = "intermediateSegmentDir"; + private static final HttpClient HTTP_CLIENT; + static { final Lifecycle lifecycle = new Lifecycle(); try { @@ -101,10 +109,9 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor LOG.error("Issues with lifecycle start", e); } HTTP_CLIENT = makeHttpClient(lifecycle); - ShutdownHookManager.addShutdownHook(()-> lifecycle.stop()); + ShutdownHookManager.addShutdownHook(() -> lifecycle.stop()); } - private final SQLMetadataConnector connector; private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig; @@ -232,7 +239,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor } Collection existingDataSources = DruidStorageHandlerUtils .getAllDataSourceNames(connector, druidMetadataStorageTablesConfig); - LOG.debug(String.format("pre-create data source with name [%s]", dataSourceName)); + LOG.debug("pre-create data source with name {}", dataSourceName); if (existingDataSources.contains(dataSourceName)) { throw new MetaException(String.format("Data source [%s] already existing", dataSourceName)); } @@ -264,36 +271,40 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor @Override public void commitCreateTable(Table table) throws MetaException { - LOG.debug(String.format("commit create table [%s]", table.getTableName())); + LOG.debug("commit create table {}", table.getTableName()); publishSegments(table, true); } - public void publishSegments(Table table, boolean overwrite) throws MetaException { if (MetaStoreUtils.isExternalTable(table)) { return; } Lifecycle lifecycle = new Lifecycle(); - LOG.info(String.format("Committing table [%s] to the druid metastore", table.getDbName())); + LOG.info("Committing table {} to the druid metastore", table.getDbName()); final Path tableDir = getSegmentDescriptorDir(); try { List segmentList = DruidStorageHandlerUtils .getPublishedSegments(tableDir, getConf()); - LOG.info(String.format("Found [%d] segments under path [%s]", segmentList.size(), tableDir)); + LOG.info("Found {} segments under path {}", segmentList.size(), tableDir); final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); - + final String segmentDirectory = + table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null + ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) + : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); DruidStorageHandlerUtils.publishSegments( connector, druidMetadataStorageTablesConfig, dataSourceName, segmentList, - DruidStorageHandlerUtils.JSON_MAPPER, - overwrite + overwrite, + segmentDirectory, + getConf() + ); final String coordinatorAddress = HiveConf .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS); int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES); - LOG.info(String.format("checking load status from coordinator [%s]", coordinatorAddress)); + LOG.info("checking load status from coordinator {}", coordinatorAddress); String coordinatorResponse = null; try { @@ -356,7 +367,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor public boolean apply(URL input) { try { String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input); - LOG.debug(String.format("Checking segment [%s] response is [%s]", input, result)); + LOG.debug("Checking segment {} response is {}", input, result); return Strings.isNullOrEmpty(result); } catch (IOException e) { LOG.error(String.format("Error while checking URL [%s]", input), e); @@ -393,10 +404,8 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor @VisibleForTesting protected void deleteSegment(DataSegment segment) throws SegmentLoadingException { - final Path path = getPath(segment); - LOG.info(String.format("removing segment[%s], located at path[%s]", segment.getIdentifier(), - path - )); + final Path path = DruidStorageHandlerUtils.getPath(segment); + LOG.info("removing segment {}, located at path {}", segment.getIdentifier(), path); try { if (path.getName().endsWith(".zip")) { @@ -404,10 +413,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor final FileSystem fs = path.getFileSystem(getConf()); if (!fs.exists(path)) { - LOG.warn(String.format( - "Segment Path [%s] does not exist. It appears to have been deleted already.", - path - )); + LOG.warn("Segment Path {} does not exist. It appears to have been deleted already.", path); return; } @@ -437,10 +443,6 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor } } - private static Path getPath(DataSegment dataSegment) { - return new Path(String.valueOf(dataSegment.getLoadSpec().get("path"))); - } - private static boolean safeNonRecursiveDelete(FileSystem fs, Path path) { try { return fs.delete(path, false); @@ -470,46 +472,37 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor ); if (deleteData == true) { - LOG.info(String.format("Dropping with purge all the data for data source [%s]", - dataSourceName - )); + LOG.info("Dropping with purge all the data for data source {}", dataSourceName); List dataSegmentList = DruidStorageHandlerUtils .getDataSegmentList(connector, druidMetadataStorageTablesConfig, dataSourceName); if (dataSegmentList.isEmpty()) { - LOG.info(String.format("Nothing to delete for data source [%s]", dataSourceName)); + LOG.info("Nothing to delete for data source {}", dataSourceName); return; } for (DataSegment dataSegment : dataSegmentList) { try { deleteSegment(dataSegment); } catch (SegmentLoadingException e) { - LOG.error(String.format("Error while deleting segment [%s]", dataSegment.getIdentifier()), - e - ); + LOG.error(String.format("Error while deleting segment [%s]", dataSegment.getIdentifier()), e); } } } if (DruidStorageHandlerUtils .disableDataSource(connector, druidMetadataStorageTablesConfig, dataSourceName)) { - LOG.info(String.format("Successfully dropped druid data source [%s]", dataSourceName)); + LOG.info("Successfully dropped druid data source {}", dataSourceName); } } @Override public void commitInsertTable(Table table, boolean overwrite) throws MetaException { - if (overwrite) { - LOG.debug(String.format("commit insert overwrite into table [%s]", table.getTableName())); - this.publishSegments(table, overwrite); - } else { - throw new MetaException("Insert into is not supported yet"); - } + LOG.debug("commit insert into table {} overwrite {}", table.getTableName(), + overwrite); + this.publishSegments(table, overwrite); } @Override public void preInsertTable(Table table, boolean overwrite) throws MetaException { - if (!overwrite) { - throw new MetaException("INSERT INTO statement is not allowed by druid storage handler"); - } + } @Override @@ -522,6 +515,9 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor ) { jobProperties.put(Constants.DRUID_SEGMENT_VERSION, new DateTime().toString()); jobProperties.put(Constants.DRUID_JOB_WORKING_DIRECTORY, getStagingWorkingDir().toString()); + // DruidOutputFormat will write segments in an intermediate directory + jobProperties.put(Constants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY, + getIntermediateSegmentDir().toString()); } @Override @@ -577,6 +573,10 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor return new Path(getStagingWorkingDir(), SEGMENTS_DESCRIPTOR_DIR_NAME); } + private Path getIntermediateSegmentDir() { + return new Path(getStagingWorkingDir(), INTERMEDIATE_SEGMENT_DIR_NAME); + } + private void cleanWorkingDir() { final FileSystem fileSystem; try { http://git-wip-us.apache.org/repos/asf/hive/blob/4a8eaa57/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 0e33836..5dd65b3 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -17,31 +17,7 @@ */ package org.apache.hadoop.hive.druid; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.util.VersionUtil; -import com.fasterxml.jackson.databind.InjectableValues; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.jsontype.TypeSerializer; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; -import com.google.common.collect.Lists; -import com.google.common.io.CharStreams; -import com.metamx.common.MapUtils; -import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.core.NoopEmitter; -import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.Request; -import com.metamx.http.client.response.InputStreamResponseHandler; +import io.druid.common.utils.JodaUtils; import io.druid.jackson.DefaultObjectMapper; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; @@ -51,9 +27,19 @@ import io.druid.query.select.SelectQueryConfig; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; import io.druid.segment.column.ColumnConfig; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.loading.DataSegmentPusherUtil; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.storage.hdfs.HdfsDataSegmentPusher; +import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.NoneShardSpec; +import io.druid.timeline.partition.NumberedShardSpec; +import io.druid.timeline.partition.PartitionChunk; +import io.druid.timeline.partition.ShardSpec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -64,16 +50,43 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.util.StringUtils; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.google.common.io.CharStreams; +import com.metamx.common.MapUtils; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.NoopEmitter; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.InputStreamResponseHandler; + import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; +import org.joda.time.Interval; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.PreparedBatch; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.ResultIterator; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; import org.slf4j.Logger; @@ -98,6 +111,8 @@ import java.util.TimeZone; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + /** * Utils class for Druid storage handler. */ @@ -345,8 +360,7 @@ public final class DruidStorageHandlerUtils { ) { try { if (!getAllDataSourceNames(connector, metadataStorageTablesConfig).contains(dataSource)) { - DruidStorageHandler.LOG - .warn(String.format("Cannot delete data source [%s], does not exist", dataSource)); + LOG.warn("Cannot delete data source {}, does not exist", dataSource); return false; } @@ -361,68 +375,153 @@ public final class DruidStorageHandlerUtils { ); } catch (Exception e) { - DruidStorageHandler.LOG.error(String.format("Error removing dataSource %s", dataSource), e); + LOG.error(String.format("Error removing dataSource %s", dataSource), e); return false; } return true; } public static void publishSegments(final SQLMetadataConnector connector, - final MetadataStorageTablesConfig metadataStorageTablesConfig, - final String dataSource, - final List segments, final ObjectMapper mapper, boolean overwrite) - { - connector.getDBI().inTransaction( - new TransactionCallback() - { - @Override - public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception - { - if(overwrite){ - disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); - } - final PreparedBatch batch = handle.prepareBatch( - String.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - metadataStorageTablesConfig.getSegmentsTable() - ) - ); - for (final DataSegment segment : segments) { - - batch.add( - new ImmutableMap.Builder() - .put("id", segment.getIdentifier()) - .put("dataSource", segment.getDataSource()) - .put("created_date", new DateTime().toString()) - .put("start", segment.getInterval().getStart().toString()) - .put("end", segment.getInterval().getEnd().toString()) - .put("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) - .put("version", segment.getVersion()) - .put("used", true) - .put("payload", mapper.writeValueAsBytes(segment)) - .build() - ); - - LOG.info("Published %s", segment.getIdentifier()); + final MetadataStorageTablesConfig metadataStorageTablesConfig, + final String dataSource, + final List segments, + boolean overwrite, + String segmentDirectory, + Configuration conf) { + try { + connector.getDBI().inTransaction( + new TransactionCallback() { + @Override + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) + throws Exception { + final List finalSegmentsToPublish = Lists.newArrayList(); + VersionedIntervalTimeline timeline; + if (overwrite) { + disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource); + // When overwriting start with empty timeline, as we are overwriting segments with new versions + timeline = new VersionedIntervalTimeline<>( + Ordering.natural() + ); + } else { + // Append Mode - build a timeline of existing segments in metadata storage. + Interval indexedInterval = JodaUtils + .umbrellaInterval(Iterables.transform(segments, + new Function() { + @Override + public Interval apply(@Nullable DataSegment input) { + return input.getInterval(); + } + })); + timeline = getTimelineForIntervalWithHandle( + handle, dataSource, indexedInterval, metadataStorageTablesConfig); + } + for (DataSegment segment : segments) { + List> existingChunks = timeline + .lookup(segment.getInterval()); + if (existingChunks.size() > 1) { + // Not possible to expand since we have more than one chunk with a single segment. + // This is the case when user wants to append a segment with coarser granularity. + // e.g If metadata storage already has segments for with granularity HOUR and segments to append have DAY granularity. + // Druid shard specs does not support multiple partitions for same interval with different granularity. + throw new IllegalStateException( + String.format( + "Cannot allocate new segment for dataSource[%s], interval[%s], already have [%,d] chunks. Not possible to append new segment.", + dataSource, + segment.getInterval(), + existingChunks.size() + ) + ); + } + // Find out the segment with latest version and maximum partition number + SegmentIdentifier max = null; + final ShardSpec newShardSpec; + final String newVersion; + if (!existingChunks.isEmpty()) { + // Some existing chunk, Find max + TimelineObjectHolder existingHolder = Iterables + .getOnlyElement(existingChunks); + for (PartitionChunk existing : existingHolder.getObject()) { + if (max == null || + max.getShardSpec().getPartitionNum() < existing.getObject() + .getShardSpec() + .getPartitionNum()) { + max = SegmentIdentifier.fromDataSegment(existing.getObject()); + } + } + } - } - batch.execute(); + if (max == null) { + // No existing shard present in the database, use the current version. + newShardSpec = segment.getShardSpec(); + newVersion = segment.getVersion(); + } else { + // use version of existing max segment to generate new shard spec + newShardSpec = getNextPartitionShardSpec(max.getShardSpec()); + newVersion = max.getVersion(); + } - return null; - } - } - ); + DataSegment publishedSegment = publishSegmentWithShardSpec(segment, + newShardSpec, newVersion, + segmentDirectory, getPath(segment).getFileSystem(conf)); + finalSegmentsToPublish.add(publishedSegment); + timeline.add(publishedSegment.getInterval(), publishedSegment.getVersion(), + publishedSegment.getShardSpec().createChunk(publishedSegment)); + + } + + // Publish new segments to metadata storage + final PreparedBatch batch = handle.prepareBatch( + String.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + metadataStorageTablesConfig.getSegmentsTable() + ) + + ); + + for (final DataSegment segment : finalSegmentsToPublish) { + + batch.add( + new ImmutableMap.Builder() + .put("id", segment.getIdentifier()) + .put("dataSource", segment.getDataSource()) + .put("created_date", new DateTime().toString()) + .put("start", segment.getInterval().getStart().toString()) + .put("end", segment.getInterval().getEnd().toString()) + .put("partitioned", + (segment.getShardSpec() instanceof NoneShardSpec) ? + false : + true) + .put("version", segment.getVersion()) + .put("used", true) + .put("payload", JSON_MAPPER.writeValueAsBytes(segment)) + .build() + ); + + LOG.info("Published {}", segment.getIdentifier()); + + } + batch.execute(); + + return null; + } + } + ); + } catch (CallbackFailedException e) { + LOG.error("Exception while publishing segments", e.getCause()); + throw Throwables.propagate(e.getCause()); + } } - public static void disableDataSourceWithHandle(Handle handle, MetadataStorageTablesConfig metadataStorageTablesConfig, String dataSource){ + public static void disableDataSourceWithHandle(Handle handle, + MetadataStorageTablesConfig metadataStorageTablesConfig, String dataSource) { handle.createStatement( - String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", - metadataStorageTablesConfig.getSegmentsTable() - ) + String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", + metadataStorageTablesConfig.getSegmentsTable() + ) ) - .bind("dataSource", dataSource) - .execute(); + .bind("dataSource", dataSource) + .execute(); } /** @@ -537,4 +636,113 @@ public final class DruidStorageHandlerUtils { conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); } + private static VersionedIntervalTimeline getTimelineForIntervalWithHandle( + final Handle handle, + final String dataSource, + final Interval interval, + final MetadataStorageTablesConfig dbTables + ) throws IOException { + Query> sql = handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE used = true AND dataSource = ? AND start <= ? AND \"end\" >= ?", + dbTables.getSegmentsTable() + ) + ).bind(0, dataSource) + .bind(1, interval.getEnd().toString()) + .bind(2, interval.getStart().toString()); + + final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( + Ordering.natural() + ); + final ResultIterator dbSegments = sql + .map(ByteArrayMapper.FIRST) + .iterator(); + try { + while (dbSegments.hasNext()) { + final byte[] payload = dbSegments.next(); + DataSegment segment = JSON_MAPPER.readValue( + payload, + DataSegment.class + ); + timeline.add(segment.getInterval(), segment.getVersion(), + segment.getShardSpec().createChunk(segment)); + } + } finally { + dbSegments.close(); + } + return timeline; + } + + public static DataSegmentPusher createSegmentPusherForDirectory(String segmentDirectory, + Configuration configuration) throws IOException { + final HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); + hdfsDataSegmentPusherConfig.setStorageDirectory(segmentDirectory); + return new HdfsDataSegmentPusher( + hdfsDataSegmentPusherConfig, configuration, JSON_MAPPER); + } + + public static DataSegment publishSegmentWithShardSpec(DataSegment segment, ShardSpec shardSpec, + String version, String segmentDirectory, FileSystem fs) + throws IOException { + boolean retry = true; + DataSegment.Builder dataSegmentBuilder = new DataSegment.Builder(segment).version(version); + Path finalPath = null; + while (retry) { + retry = false; + dataSegmentBuilder.shardSpec(shardSpec); + final Path intermediatePath = getPath(segment); + finalPath = finalPathForSegment(segmentDirectory, dataSegmentBuilder.build()); + + // Create parent if it does not exist, recreation is not an error + fs.mkdirs(finalPath.getParent()); + + if (!fs.rename(intermediatePath, finalPath)) { + if (fs.exists(finalPath)) { + // Someone else is also trying to append + shardSpec = getNextPartitionShardSpec(shardSpec); + retry = true; + } else { + throw new IOException(String.format( + "Failed to rename intermediate segment[%s] to final segment[%s] is not present.", + intermediatePath, + finalPath + )); + } + } + } + DataSegment dataSegment = dataSegmentBuilder + .loadSpec(ImmutableMap.of("type", "hdfs", "path", finalPath.toString())) + .build(); + + writeSegmentDescriptor(fs, dataSegment, new Path(finalPath.getParent(), "descriptor.json")); + + return dataSegment; + } + + public static Path finalPathForSegment(String segmentDirectory, DataSegment segment) { + return new Path( + String.format("%s/%s/index.zip", segmentDirectory, + DataSegmentPusherUtil.getHdfsStorageDir(segment))); + } + + private static ShardSpec getNextPartitionShardSpec(ShardSpec shardSpec) { + if (shardSpec instanceof LinearShardSpec) { + return new LinearShardSpec(shardSpec.getPartitionNum() + 1); + } else if (shardSpec instanceof NumberedShardSpec) { + return new NumberedShardSpec(shardSpec.getPartitionNum(), + ((NumberedShardSpec) shardSpec).getPartitions()); + } else { + // Druid only support appending more partitions to Linear and Numbered ShardSpecs. + throw new IllegalStateException( + String.format( + "Cannot expand shard spec [%s]", + shardSpec + ) + ); + } + } + + public static Path getPath(DataSegment dataSegment) { + return new Path(String.valueOf(dataSegment.getLoadSpec().get("path"))); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/4a8eaa57/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index 31db86a..5e1deac 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -39,10 +39,8 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; -import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.plumber.CustomVersioningPolicy; -import io.druid.storage.hdfs.HdfsDataSegmentPusher; -import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; + import org.apache.calcite.adapter.druid.DruidTable; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; @@ -63,6 +61,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.util.Progressable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,15 +93,7 @@ public class DruidOutputFormat implements HiveOutputFormat implements HiveOutputFormat existingSegments = Arrays.asList(DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v0") - .interval(new Interval(1, 10)).shardSpec(NoneShardSpec.instance()).build()); - DruidStorageHandlerUtils.publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, - existingSegments, - DruidStorageHandlerUtils.JSON_MAPPER, - true - ); + List existingSegments = Arrays + .asList(createSegment(new Path(taskDirPath, "index_old.zip").toString(), + new Interval(100, 150), "v0", new LinearShardSpec(0))); + DruidStorageHandlerUtils + .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + true, + taskDirPath.toString(), + config + ); DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); druidStorageHandler.commitInsertTable(tableMock, true); Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( - DruidStorageHandlerUtils.getAllDataSourceNames(connector, - metadataStorageTablesConfig - )).toArray()); - - final List dataSegmentList = connector.getDBI() - .withHandle(new HandleCallback>() { - @Override - public List withHandle(Handle handle) throws Exception { - return handle - .createQuery(String.format("SELECT payload FROM %s WHERE used=true", - metadataStorageTablesConfig.getSegmentsTable())) - .map(new ResultSetMapper() { - - @Override - public DataSegment map(int i, ResultSet resultSet, - StatementContext statementContext) - throws SQLException { - try { - return DruidStorageHandlerUtils.JSON_MAPPER.readValue( - resultSet.getBytes("payload"), - DataSegment.class - ); - } catch (IOException e) { - throw Throwables.propagate(e); - } - } - }).list(); - } - }); + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + + final List dataSegmentList = getUsedSegmentsList(connector, + metadataStorageTablesConfig); Assert.assertEquals(1, dataSegmentList.size()); + DataSegment persistedSegment = Iterables.getOnlyElement(dataSegmentList); + Assert.assertEquals(dataSegment, persistedSegment); + Assert.assertEquals(dataSegment.getVersion(), persistedSegment.getVersion()); + String expectedFinalPath = DruidStorageHandlerUtils.finalPathForSegment( + config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), persistedSegment) + .toString(); + Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalPath), + persistedSegment.getLoadSpec()); + Assert.assertEquals("dummySegmentData", + FileUtils.readFileToString(new File(expectedFinalPath))); + } + + private List getUsedSegmentsList(DerbyConnectorTestUtility connector, + final MetadataStorageTablesConfig metadataStorageTablesConfig) { + return connector.getDBI() + .withHandle(new HandleCallback>() { + @Override + public List withHandle(Handle handle) throws Exception { + return handle + .createQuery(String.format( + "SELECT payload FROM %s WHERE used=true ORDER BY created_date ASC", + metadataStorageTablesConfig.getSegmentsTable())) + .map(new ResultSetMapper() { + + @Override + public DataSegment map(int i, ResultSet resultSet, + StatementContext statementContext) + throws SQLException { + try { + return DruidStorageHandlerUtils.JSON_MAPPER.readValue( + resultSet.getBytes("payload"), + DataSegment.class + ); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + }).list(); + } + }); + } + + @Test + public void testCommitInsertIntoTable() throws MetaException, IOException { + DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); + MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule + .metadataTablesConfigSupplier().get(); + druidStorageHandler.preCreateTable(tableMock); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + List existingSegments = Arrays + .asList(createSegment(new Path(taskDirPath, "index_old.zip").toString(), + new Interval(100, 150), "v0", new LinearShardSpec(1))); + DruidStorageHandlerUtils + .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + true, + taskDirPath.toString(), + config + ); + DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), + new Interval(100, 150), "v1", new LinearShardSpec(0)); + Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); + druidStorageHandler.commitInsertTable(tableMock, false); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + + final List dataSegmentList = getUsedSegmentsList(connector, + metadataStorageTablesConfig); + Assert.assertEquals(2, dataSegmentList.size()); + + DataSegment persistedSegment = dataSegmentList.get(1); + // Insert into appends to old version + Assert.assertEquals("v0", persistedSegment.getVersion()); + Assert.assertTrue(persistedSegment.getShardSpec() instanceof LinearShardSpec); + Assert.assertEquals(2, persistedSegment.getShardSpec().getPartitionNum()); + String expectedFinalPath = DruidStorageHandlerUtils.finalPathForSegment( + config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), persistedSegment) + .toString(); + Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalPath), + persistedSegment.getLoadSpec()); + Assert.assertEquals("dummySegmentData", + FileUtils.readFileToString(new File(expectedFinalPath))); + } + + @Test + public void testCommitInsertIntoWhenDestinationSegmentFileExist() + throws MetaException, IOException { + DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); + MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule + .metadataTablesConfigSupplier().get(); + druidStorageHandler.preCreateTable(tableMock); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + List existingSegments = Arrays + .asList(createSegment(new Path(taskDirPath, "index_old.zip").toString(), + new Interval(100, 150), "v0", new LinearShardSpec(1))); + DruidStorageHandlerUtils + .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + true, + taskDirPath.toString(), + config + ); + DataSegment dataSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), + new Interval(100, 150), "v1", new LinearShardSpec(0)); + Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath); + + // Create segment file at the destination location with LinearShardSpec(2) + FileUtils.writeStringToFile(new File(DruidStorageHandlerUtils.finalPathForSegment( + config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), + createSegment(new Path(taskDirPath, "index_conflict.zip").toString(), + new Interval(100, 150), "v1", new LinearShardSpec(1))).toString()), "dummy"); + druidStorageHandler.commitInsertTable(tableMock, false); + Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList( + DruidStorageHandlerUtils.getAllDataSourceNames(connector, + metadataStorageTablesConfig + )).toArray()); + + final List dataSegmentList = getUsedSegmentsList(connector, + metadataStorageTablesConfig); + Assert.assertEquals(2, dataSegmentList.size()); + + DataSegment persistedSegment = dataSegmentList.get(1); + // Insert into appends to old version + Assert.assertEquals("v0", persistedSegment.getVersion()); + Assert.assertTrue(persistedSegment.getShardSpec() instanceof LinearShardSpec); + // insert into should skip and increment partition number to 3 + Assert.assertEquals(2, persistedSegment.getShardSpec().getPartitionNum()); + String expectedFinalPath = DruidStorageHandlerUtils.finalPathForSegment( + config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)), persistedSegment) + .toString(); + Assert.assertEquals(ImmutableMap.of("type", "hdfs", "path", expectedFinalPath), + persistedSegment.getLoadSpec()); + Assert.assertEquals("dummySegmentData", + FileUtils.readFileToString(new File(expectedFinalPath))); + } + + @Test(expected = IllegalStateException.class) + public void testCommitInsertIntoWithConflictingIntervalSegment() + throws MetaException, IOException { + DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); + MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule + .metadataTablesConfigSupplier().get(); + druidStorageHandler.preCreateTable(tableMock); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + List existingSegments = Arrays.asList( + createSegment(new Path(taskDirPath, "index_old_1.zip").toString(), + new Interval(100, 150), + "v0", new LinearShardSpec(0)), + createSegment(new Path(taskDirPath, "index_old_2.zip").toString(), + new Interval(150, 200), + "v0", new LinearShardSpec(0)), + createSegment(new Path(taskDirPath, "index_old_3.zip").toString(), + new Interval(200, 300), + "v0", new LinearShardSpec(0))); + DruidStorageHandlerUtils + .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + true, + taskDirPath.toString(), + config + ); + + // Try appending segment with conflicting interval + DataSegment conflictingSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), + new Interval(100, 300), "v1", new LinearShardSpec(0)); + Path descriptorPath = DruidStorageHandlerUtils + .makeSegmentDescriptorOutputPath(conflictingSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils + .writeSegmentDescriptor(localFileSystem, conflictingSegment, descriptorPath); + druidStorageHandler.commitInsertTable(tableMock, false); + } + + @Test(expected = IllegalStateException.class) + public void testCommitInsertIntoWithNonExtendableSegment() throws MetaException, IOException { + DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector(); + MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule + .metadataTablesConfigSupplier().get(); + druidStorageHandler.preCreateTable(tableMock); + LocalFileSystem localFileSystem = FileSystem.getLocal(config); + Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName()); + List existingSegments = Arrays + .asList(createSegment(new Path(taskDirPath, "index_old_1.zip").toString(), + new Interval(100, 150), "v0", new NoneShardSpec()), + createSegment(new Path(taskDirPath, "index_old_2.zip").toString(), + new Interval(200, 250), "v0", new LinearShardSpec(0)), + createSegment(new Path(taskDirPath, "index_old_3.zip").toString(), + new Interval(250, 300), "v0", new LinearShardSpec(0))); + DruidStorageHandlerUtils + .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME, + existingSegments, + true, + taskDirPath.toString(), + config + ); + + // Try appending to non extendable shard spec + DataSegment conflictingSegment = createSegment(new Path(taskDirPath, "index.zip").toString(), + new Interval(100, 150), "v1", new LinearShardSpec(0)); + Path descriptorPath = DruidStorageHandlerUtils + .makeSegmentDescriptorOutputPath(conflictingSegment, + new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME) + ); + DruidStorageHandlerUtils + .writeSegmentDescriptor(localFileSystem, conflictingSegment, descriptorPath); + + druidStorageHandler.commitInsertTable(tableMock, false); } + }