From commits-return-7301-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Tue Apr 23 17:15:30 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 53B1B18077A for ; Tue, 23 Apr 2019 19:15:30 +0200 (CEST) Received: (qmail 68564 invoked by uid 500); 23 Apr 2019 17:15:29 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 68500 invoked by uid 99); 23 Apr 2019 17:15:29 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Apr 2019 17:15:29 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 1AFD087A57; Tue, 23 Apr 2019 17:15:29 +0000 (UTC) Date: Tue, 23 Apr 2019 17:15:31 +0000 To: "commits@kudu.apache.org" Subject: [kudu] 03/03: [backup] Fix fromMs override option MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: granthenke@apache.org In-Reply-To: <155603972830.3618.5909023398708846762@gitbox.apache.org> References: <155603972830.3618.5909023398708846762@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: kudu X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Rev: 2b023b979358cf3716c3b282ff21879bd97cec4a X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190423171529.1AFD087A57@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git commit 2b023b979358cf3716c3b282ff21879bd97cec4a Author: Grant Henke AuthorDate: Mon Apr 22 21:20:00 2019 -0500 [backup] Fix fromMs override option Fix the logic for deciding when to lookup old backups. Previously the fromMs argument wasn’t being handled correctly. Additionally contains a small change to avoid converting back and forth from Path to HPath multiple times. Change-Id: I8a3abc47dd9d1441ba269dfc9405691f79e6615d Reviewed-on: http://gerrit.cloudera.org:8080/13080 Tested-by: Kudu Jenkins Reviewed-by: Will Berkeley --- .../scala/org/apache/kudu/backup/KuduBackup.scala | 14 +++--- .../scala/org/apache/kudu/backup/Options.scala | 7 +-- .../scala/org/apache/kudu/backup/SessionIO.scala | 7 ++- .../org/apache/kudu/backup/TestKuduBackup.scala | 50 ++++++++++++++++++---- 4 files changed, 58 insertions(+), 20 deletions(-) diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala index a8d89f6..445bef0 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala @@ -56,22 +56,26 @@ object KuduBackup { // Unless we are forcing a full backup or a fromMs was set, find the previous backup and // use the `to_ms` metadata as the `from_ms` time for this backup. - if (!tableOptions.forceFull || tableOptions.fromMs != 0) { - log.info("No fromMs option set, looking for a previous backup.") + if (tableOptions.forceFull) { + log.info("Performing a full backup, forceFull was set to true") + } else if (tableOptions.fromMs == BackupOptions.DefaultFromMS) { + log.info(s"Performing an incremental backup, fromMs was set to ${tableOptions.fromMs}") + } else { + log.info("Looking for a previous backup, forceFull or fromMs options are not set.") val graph = io.readBackupGraph(tableName) if (graph.hasFullBackup) { val base = graph.backupBase log.info(s"Setting fromMs to ${base.metadata.getToMs} from backup in path: ${base.path}") tableOptions = tableOptions.copy(fromMs = base.metadata.getToMs) } else { - log.info("No full backup was found. Starting a full backup.") - tableOptions = tableOptions.copy(fromMs = 0) + log.info("No previous backup was found. Starting a full backup.") + tableOptions = tableOptions.copy(forceFull = true) } } val rdd = new KuduBackupRDD(table, tableOptions, context, session.sparkContext) val df = session.sqlContext - .createDataFrame(rdd, io.dataSchema(table.getSchema, options.isIncremental)) + .createDataFrame(rdd, io.dataSchema(table.getSchema, tableOptions.isIncremental)) // Write the data to the backup path. // The backup path contains the timestampMs and should not already exist. diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala index a57931f..2874eac 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/Options.scala @@ -39,7 +39,7 @@ case class BackupOptions( kuduMasterAddresses: String = InetAddress.getLocalHost.getCanonicalHostName, toMs: Long = System.currentTimeMillis(), forceFull: Boolean = BackupOptions.DefaultForceFull, - fromMs: Long = 0, + fromMs: Long = BackupOptions.DefaultFromMS, format: String = BackupOptions.DefaultFormat, scanBatchSize: Int = BackupOptions.DefaultScanBatchSize, scanRequestTimeoutMs: Long = BackupOptions.DefaultScanRequestTimeoutMs, @@ -47,14 +47,15 @@ case class BackupOptions( keepAlivePeriodMs: Long = BackupOptions.DefaultKeepAlivePeriodMs) extends CommonOptions { - // If not forcing a full backup and fromMs is not zero, this is an incremental backup. + // If not forcing a full backup and fromMs is not set, this is an incremental backup. def isIncremental: Boolean = { - !forceFull && fromMs != 0 + !forceFull && fromMs != BackupOptions.DefaultFromMS } } object BackupOptions { val DefaultForceFull: Boolean = false + val DefaultFromMS: Long = 0 val DefaultFormat: String = "parquet" val DefaultScanBatchSize: Int = 1024 * 1024 * 20 // 20 MiB val DefaultScanRequestTimeoutMs: Long = diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala index 8977174..40f513f 100644 --- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala +++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/SessionIO.scala @@ -161,7 +161,7 @@ class SessionIO(val session: SparkSession, options: CommonOptions) { if (file.isDirectory) { val metadataHPath = new HPath(file.getPath, MetadataFileName) if (fs.exists(metadataHPath)) { - val metadata = readTableMetadata(Paths.get(metadataHPath.toString)) + val metadata = readTableMetadata(metadataHPath) results += ((Paths.get(file.getPath.toString), metadata)) } } @@ -176,9 +176,8 @@ class SessionIO(val session: SparkSession, options: CommonOptions) { * @param metadataPath the path to the metadata file. * @return the deserialized table metadata. */ - private def readTableMetadata(metadataPath: Path): TableMetadataPB = { - val hPath = new HPath(metadataPath.toString) - val in = new InputStreamReader(fs.open(hPath), StandardCharsets.UTF_8) + private def readTableMetadata(metadataPath: HPath): TableMetadataPB = { + val in = new InputStreamReader(fs.open(metadataPath), StandardCharsets.UTF_8) val json = CharStreams.toString(in) in.close() val builder = TableMetadataPB.newBuilder() diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala index a707702..8e1c6a2 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala +++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala @@ -19,6 +19,7 @@ package org.apache.kudu.backup import java.nio.file.Files import java.nio.file.Path import java.util +import java.util.concurrent.TimeUnit import com.google.common.base.Objects import org.apache.commons.io.FileUtils @@ -27,6 +28,7 @@ import org.apache.kudu.client._ import org.apache.kudu.ColumnSchema import org.apache.kudu.Schema import org.apache.kudu.spark.kudu._ +import org.apache.kudu.test.CapturingLogAppender import org.apache.kudu.test.RandomUtils import org.apache.kudu.util.DataGenerator.DataGeneratorBuilder import org.apache.kudu.util.HybridTimeUtil @@ -74,12 +76,12 @@ class TestKuduBackup extends KuduTestSuite { val rootDir = Files.createTempDirectory("backup") doBackup(rootDir, Seq(tableName)) // Full backup. insertRows(table, 100, 100) // Insert more data. - doBackup(rootDir, Seq(tableName), incremental = true) // Incremental backup. + doBackup(rootDir, Seq(tableName)) // Incremental backup. // Delete rows that span the full and incremental backup. Range(50, 150).foreach { i => deleteRow(i) } - doBackup(rootDir, Seq(tableName), incremental = true) // Incremental backup. + doBackup(rootDir, Seq(tableName)) // Incremental backup. doRestore(rootDir, Seq(tableName)) // Restore all the backups. FileUtils.deleteDirectory(rootDir.toFile) @@ -89,6 +91,28 @@ class TestKuduBackup extends KuduTestSuite { } @Test + def TestForceIncrementalBackup() { + insertRows(table, 100) // Insert data into the default test table. + val beforeMs = getPropagatedTimestampMs + insertRows(table, 100, 100) // Insert more data. + + val rootDir = Files.createTempDirectory("backup") + + // Capture the logs to validate job internals. + val logs = new CapturingLogAppender() + + // Force an incremental backup without a full backup. + // It will use a diff scan and won't check the existing dependency graph. + val handle = logs.attach() + doBackup(rootDir, Seq(tableName), fromMs = beforeMs) // Incremental backup. + handle.close() + + assertTrue(Files.isDirectory(rootDir)) + assertEquals(1, rootDir.toFile.list().length) + assertTrue(logs.getAppendedText.contains("fromMs was set")) + } + + @Test def testSimpleBackupAndRestoreWithSpecialCharacters() { // Use an Impala-style table name to verify url encoding/decoding of the table name works. val impalaTableName = "impala::default.test" @@ -294,7 +318,10 @@ class TestKuduBackup extends KuduTestSuite { FileUtils.deleteDirectory(rootDir.toFile) } - def doBackup(rootDir: Path, tableNames: Seq[String], incremental: Boolean = false): Unit = { + def doBackup( + rootDir: Path, + tableNames: Seq[String], + fromMs: Long = BackupOptions.DefaultFromMS): Unit = { val nowMs = System.currentTimeMillis() // Log the timestamps to simplify flaky debugging. @@ -310,11 +337,11 @@ class TestKuduBackup extends KuduTestSuite { // millisecond value as nowMs (after truncating the micros) the records inserted in the // microseconds after truncation could be unread. val backupOptions = new BackupOptions( - tableNames, - rootDir.toUri.toString, - harness.getMasterAddressesAsString, - nowMs + 1, - incremental + tables = tableNames, + rootPath = rootDir.toUri.toString, + kuduMasterAddresses = harness.getMasterAddressesAsString, + toMs = nowMs + 1, + fromMs = fromMs ) KuduBackup.run(backupOptions, ss) } @@ -324,4 +351,11 @@ class TestKuduBackup extends KuduTestSuite { new RestoreOptions(tableNames, rootDir.toUri.toString, harness.getMasterAddressesAsString) KuduRestore.run(restoreOptions, ss) } + + private def getPropagatedTimestampMs: Long = { + val propagatedTimestamp = harness.getClient.getLastPropagatedTimestamp + val physicalTimeMicros = + HybridTimeUtil.HTTimestampToPhysicalAndLogical(propagatedTimestamp).head + TimeUnit.MILLISECONDS.convert(physicalTimeMicros, TimeUnit.MICROSECONDS) + } }