From commits-return-11614-archive-asf-public=cust-asf.ponee.io@hudi.apache.org Fri Feb 14 18:58:37 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0D62F180647 for ; Fri, 14 Feb 2020 19:58:36 +0100 (CET) Received: (qmail 14920 invoked by uid 500); 14 Feb 2020 18:58:36 -0000 Mailing-List: contact commits-help@hudi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hudi.apache.org Delivered-To: mailing list commits@hudi.apache.org Received: (qmail 14911 invoked by uid 99); 14 Feb 2020 18:58:36 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Feb 2020 18:58:36 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 37C188546B; Fri, 14 Feb 2020 18:58:36 +0000 (UTC) Date: Fri, 14 Feb 2020 18:58:36 +0000 To: "commits@hudi.apache.org" Subject: [incubator-hudi] branch master updated: [HUDI-571] Add show archived compaction(s) to CLI MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <158170671606.6710.14617235531255760644@gitbox.apache.org> From: nagarwal@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-hudi X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: dfbee673ef63c7305e131d5dfa17b304b128917f X-Git-Newrev: 20ed2516d38b9ce4b3e185bd89b62264b8bd3f25 X-Git-Rev: 20ed2516d38b9ce4b3e185bd89b62264b8bd3f25 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. nagarwal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git The following commit(s) were added to refs/heads/master by this push: new 20ed251 [HUDI-571] Add show archived compaction(s) to CLI 20ed251 is described below commit 20ed2516d38b9ce4b3e185bd89b62264b8bd3f25 Author: Satish Kotha AuthorDate: Wed Jan 22 13:50:34 2020 -0800 [HUDI-571] Add show archived compaction(s) to CLI --- .../apache/hudi/cli/commands/CommitsCommand.java | 13 +- .../hudi/cli/commands/CompactionCommand.java | 247 ++++++++++++++++----- .../java/org/apache/hudi/cli/utils/CommitUtil.java | 23 ++ 3 files changed, 216 insertions(+), 67 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java index 1e17c4c..804096b 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java @@ -21,6 +21,7 @@ package org.apache.hudi.cli.commands; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.cli.utils.CommitUtil; import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -41,10 +42,8 @@ import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; import java.io.IOException; -import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -186,10 +185,10 @@ public class CommitsCommand implements CommandMarker { final boolean headerOnly) throws IOException { if (StringUtils.isNullOrEmpty(startTs)) { - startTs = getTimeDaysAgo(10); + startTs = CommitUtil.getTimeDaysAgo(10); } if (StringUtils.isNullOrEmpty(endTs)) { - endTs = getTimeDaysAgo(1); + endTs = CommitUtil.getTimeDaysAgo(1); } HoodieArchivedTimeline archivedTimeline = HoodieCLI.getTableMetaClient().getArchivedTimeline(); try { @@ -362,10 +361,4 @@ public class CommitsCommand implements CommandMarker { return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and " + HoodieCLI.syncTableMetadata.getTableConfig().getTableName(); } - - private String getTimeDaysAgo(int numberOfDays) { - Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant()); - return HoodieActiveTimeline.COMMIT_FORMATTER.format(date); - } - } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index 0b57947..2564931 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -26,16 +26,20 @@ import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.commands.SparkMain.SparkCommand; +import org.apache.hudi.cli.utils.CommitUtil; import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.AvroUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.func.OperationResult; @@ -61,8 +65,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * CLI command to display compaction related options. @@ -95,51 +101,9 @@ public class CompactionCommand implements CommandMarker { throws IOException { HoodieTableMetaClient client = checkAndGetMetaClient(); HoodieActiveTimeline activeTimeline = client.getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionTimeline(); - HoodieTimeline commitTimeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); - Set committed = commitTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); - - List instants = timeline.getReverseOrderedInstants().collect(Collectors.toList()); - List rows = new ArrayList<>(); - for (HoodieInstant instant : instants) { - HoodieCompactionPlan compactionPlan = null; - if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) { - try { - // This could be a completed compaction. Assume a compaction request file is present but skip if fails - compactionPlan = AvroUtils.deserializeCompactionPlan( - activeTimeline.readCompactionPlanAsBytes( - HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get()); - } catch (HoodieIOException ioe) { - // SKIP - } - } else { - compactionPlan = AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes( - HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get()); - } - - if (null != compactionPlan) { - State state = instant.getState(); - if (committed.contains(instant.getTimestamp())) { - state = State.COMPLETED; - } - if (includeExtraMetadata) { - rows.add(new Comparable[] {instant.getTimestamp(), state.toString(), - compactionPlan.getOperations() == null ? 0 : compactionPlan.getOperations().size(), - compactionPlan.getExtraMetadata().toString()}); - } else { - rows.add(new Comparable[] {instant.getTimestamp(), state.toString(), - compactionPlan.getOperations() == null ? 0 : compactionPlan.getOperations().size()}); - } - } - } - - Map> fieldNameToConverterMap = new HashMap<>(); - TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State") - .addTableHeaderField("Total FileIds to be Compacted"); - if (includeExtraMetadata) { - header = header.addTableHeaderField("Extra Metadata"); - } - return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); + return printAllCompactions(activeTimeline, + compactionPlanReader(this::readCompactionPlanForActiveTimeline, activeTimeline), + includeExtraMetadata, sortByField, descending, limit, headerOnly); } @CliCommand(value = "compaction show", help = "Shows compaction details for a specific compaction instant") @@ -159,19 +123,68 @@ public class CompactionCommand implements CommandMarker { activeTimeline.readCompactionPlanAsBytes( HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get()); - List rows = new ArrayList<>(); - if ((null != compactionPlan) && (null != compactionPlan.getOperations())) { - for (HoodieCompactionOperation op : compactionPlan.getOperations()) { - rows.add(new Comparable[] {op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(), - op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : op.getMetrics().toString()}); - } + return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly); + } + + @CliCommand(value = "compactions show archived", help = "Shows compaction details for specified time window") + public String compactionShowArchived( + @CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata", + unspecifiedDefaultValue = "false") final boolean includeExtraMetadata, + @CliOption(key = {"startTs"}, mandatory = false, help = "start time for compactions, default: now - 10 days") + String startTs, + @CliOption(key = {"endTs"}, mandatory = false, help = "end time for compactions, default: now - 1 day") + String endTs, + @CliOption(key = {"limit"}, help = "Limit compactions", + unspecifiedDefaultValue = "-1") final Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, + @CliOption(key = {"headeronly"}, help = "Print Header Only", + unspecifiedDefaultValue = "false") final boolean headerOnly) + throws Exception { + if (StringUtils.isNullOrEmpty(startTs)) { + startTs = CommitUtil.getTimeDaysAgo(10); + } + if (StringUtils.isNullOrEmpty(endTs)) { + endTs = CommitUtil.getTimeDaysAgo(1); } - Map> fieldNameToConverterMap = new HashMap<>(); - TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("File Id") - .addTableHeaderField("Base Instant").addTableHeaderField("Data File Path") - .addTableHeaderField("Total Delta Files").addTableHeaderField("getMetrics"); - return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); + HoodieTableMetaClient client = checkAndGetMetaClient(); + HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline(); + archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); + try { + return printAllCompactions(archivedTimeline, + compactionPlanReader(this::readCompactionPlanForArchivedTimeline, archivedTimeline), + includeExtraMetadata, sortByField, descending, limit, headerOnly); + } finally { + archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs); + } + } + + @CliCommand(value = "compaction show archived", help = "Shows compaction details for a specific compaction instant") + public String compactionShowArchived( + @CliOption(key = "instant", mandatory = true, + help = "instant time") final String compactionInstantTime, + @CliOption(key = {"limit"}, help = "Limit commits", + unspecifiedDefaultValue = "-1") final Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, + @CliOption(key = {"headeronly"}, help = "Print Header Only", + unspecifiedDefaultValue = "false") final boolean headerOnly) + throws Exception { + HoodieTableMetaClient client = checkAndGetMetaClient(); + HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline(); + HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, + HoodieTimeline.COMPACTION_ACTION, compactionInstantTime); + String startTs = CommitUtil.addHours(compactionInstantTime, -1); + String endTs = CommitUtil.addHours(compactionInstantTime, 1); + try { + archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); + HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( + archivedTimeline.getInstantDetails(instant).get()); + return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly); + } finally { + archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs); + } } @CliCommand(value = "compaction schedule", help = "Schedule Compaction") @@ -249,6 +262,126 @@ public class CompactionCommand implements CommandMarker { return "Compaction successfully completed for " + compactionInstantTime; } + /** + * Prints all compaction details. + */ + private String printAllCompactions(HoodieDefaultTimeline timeline, + Function compactionPlanReader, + boolean includeExtraMetadata, + String sortByField, + boolean descending, + int limit, + boolean headerOnly) { + + Stream instantsStream = timeline.getCommitsAndCompactionTimeline().getReverseOrderedInstants(); + List> compactionPlans = instantsStream + .map(instant -> Pair.of(instant, compactionPlanReader.apply(instant))) + .filter(pair -> pair.getRight() != null) + .collect(Collectors.toList()); + + Set committedInstants = timeline.getCommitTimeline().filterCompletedInstants() + .getInstants().collect(Collectors.toSet()); + + List rows = new ArrayList<>(); + for (Pair compactionPlan : compactionPlans) { + HoodieCompactionPlan plan = compactionPlan.getRight(); + HoodieInstant instant = compactionPlan.getLeft(); + final HoodieInstant.State state; + if (committedInstants.contains(instant)) { + state = HoodieInstant.State.COMPLETED; + } else { + state = instant.getState(); + } + + if (includeExtraMetadata) { + rows.add(new Comparable[] {instant.getTimestamp(), state.toString(), + plan.getOperations() == null ? 0 : plan.getOperations().size(), + plan.getExtraMetadata().toString()}); + } else { + rows.add(new Comparable[] {instant.getTimestamp(), state.toString(), + plan.getOperations() == null ? 0 : plan.getOperations().size()}); + } + } + + Map> fieldNameToConverterMap = new HashMap<>(); + TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State") + .addTableHeaderField("Total FileIds to be Compacted"); + if (includeExtraMetadata) { + header = header.addTableHeaderField("Extra Metadata"); + } + return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); + } + + /** + * Compaction reading is different for different timelines. Create partial function to override special logic. + * We can make these read methods part of HoodieDefaultTimeline and override where necessary. But the + * BiFunction below has 'hacky' exception blocks, so restricting it to CLI. + */ + private + Function compactionPlanReader( + BiFunction f, T timeline) { + + return (y) -> f.apply(timeline, y); + } + + private HoodieCompactionPlan readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline, + HoodieInstant instant) { + if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) { + return null; + } else { + try { + return AvroUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get()); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + } + + /** + * TBD Can we make this part of HoodieActiveTimeline or a utility class. + */ + private HoodieCompactionPlan readCompactionPlanForActiveTimeline(HoodieActiveTimeline activeTimeline, + HoodieInstant instant) { + try { + if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) { + try { + // This could be a completed compaction. Assume a compaction request file is present but skip if fails + return AvroUtils.deserializeCompactionPlan( + activeTimeline.readCompactionPlanAsBytes( + HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get()); + } catch (HoodieIOException ioe) { + // SKIP + return null; + } + } else { + return AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes( + HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get()); + } + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + + private String printCompaction(HoodieCompactionPlan compactionPlan, + String sortByField, + boolean descending, + int limit, + boolean headerOnly) { + List rows = new ArrayList<>(); + if ((null != compactionPlan) && (null != compactionPlan.getOperations())) { + for (HoodieCompactionOperation op : compactionPlan.getOperations()) { + rows.add(new Comparable[]{op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(), + op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : op.getMetrics().toString()}); + } + } + + Map> fieldNameToConverterMap = new HashMap<>(); + TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("File Id") + .addTableHeaderField("Base Instant").addTableHeaderField("Data File Path") + .addTableHeaderField("Total Delta Files").addTableHeaderField("getMetrics"); + return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); + } + private static String getTmpSerializerFile() { return TMP_DIR + UUID.randomUUID().toString() + ".ser"; } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java index aaaeb35..60d3e3e 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java @@ -21,9 +21,15 @@ package org.apache.hudi.cli.utils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import java.io.IOException; +import java.text.ParseException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Date; import java.util.List; /** @@ -42,4 +48,21 @@ public class CommitUtil { } return totalNew; } + + public static String getTimeDaysAgo(int numberOfDays) { + Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant()); + return HoodieActiveTimeline.COMMIT_FORMATTER.format(date); + } + + /** + * Add hours to specified time. If hours <0, this acts as remove hours. + * example, say compactionCommitTime: "20200202020000" + * a) hours: +1, returns 20200202030000 + * b) hours: -1, returns 20200202010000 + */ + public static String addHours(String compactionCommitTime, int hours) throws ParseException { + Instant instant = HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).toInstant(); + ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault()); + return HoodieActiveTimeline.COMMIT_FORMATTER.format(Date.from(commitDateTime.plusHours(hours).toInstant())); + } }