From commits-return-13405-archive-asf-public=cust-asf.ponee.io@hudi.apache.org Sun Mar 15 12:24:42 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 654BD180637 for ; Sun, 15 Mar 2020 13:24:42 +0100 (CET) Received: (qmail 61913 invoked by uid 500); 15 Mar 2020 12:24:41 -0000 Mailing-List: contact commits-help@hudi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hudi.apache.org Delivered-To: mailing list commits@hudi.apache.org Received: (qmail 61904 invoked by uid 99); 15 Mar 2020 12:24:41 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 15 Mar 2020 12:24:41 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 7C3268DACA; Sun, 15 Mar 2020 12:24:41 +0000 (UTC) Date: Sun, 15 Mar 2020 12:24:41 +0000 To: "commits@hudi.apache.org" Subject: [incubator-hudi] branch master updated: [HUDI-344] Improve exporter tests (#1404) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <158427508132.7722.17624829466485328478@gitbox.apache.org> From: leesf@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-hudi X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 99b7e9eb9ef8827c1e06b7e8621b6be6403b061e X-Git-Newrev: 14323cb10012bdbf80cbb838928af9301cb42ba0 X-Git-Rev: 14323cb10012bdbf80cbb838928af9301cb42ba0 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git The following commit(s) were added to refs/heads/master by this push: new 14323cb [HUDI-344] Improve exporter tests (#1404) 14323cb is described below commit 14323cb10012bdbf80cbb838928af9301cb42ba0 Author: Raymond Xu <2701446+xushiyan@users.noreply.github.com> AuthorDate: Sun Mar 15 05:24:30 2020 -0700 [HUDI-344] Improve exporter tests (#1404) --- .../hudi/utilities/HoodieSnapshotExporter.java | 9 + .../apache/hudi/utilities/DataSourceTestUtils.java | 50 ---- .../hudi/utilities/TestHoodieSnapshotExporter.java | 318 +++++++++------------ 3 files changed, 151 insertions(+), 226 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java index f785d74..b58b5d3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java @@ -132,6 +132,7 @@ public class HoodieSnapshotExporter { // No transformation is needed for output format "HUDI", just copy the original files. copySnapshot(jsc, fs, cfg, partitions, dataFiles, latestCommitTimestamp, serConf); } + createSuccessTag(fs, cfg.targetOutputPath); } else { LOG.info("The job has 0 partition to copy."); } @@ -205,6 +206,14 @@ public class HoodieSnapshotExporter { } } + private void createSuccessTag(FileSystem fs, String targetOutputPath) throws IOException { + Path successTagPath = new Path(targetOutputPath + "/_SUCCESS"); + if (!fs.exists(successTagPath)) { + LOG.info(String.format("Creating _SUCCESS under target output path: %s", targetOutputPath)); + fs.createNewFile(successTagPath); + } + } + public static void main(String[] args) throws IOException { // Take input configs final Config cfg = new Config(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java deleted file mode 100644 index 1a96b81..0000000 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.utilities; - -import org.apache.hudi.common.TestRawTripPayload; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.Option; - -import java.io.IOException; -import java.util.List; -import java.util.stream.Collectors; - -/** - * Test utils for data source tests. - */ -public class DataSourceTestUtils { - - public static Option convertToString(HoodieRecord record) { - try { - String str = ((TestRawTripPayload) record.getData()).getJsonData(); - str = "{" + str.substring(str.indexOf("\"timestamp\":")); - // Remove the last } bracket - str = str.substring(0, str.length() - 1); - return Option.of(str + ", \"partition\": \"" + record.getPartitionPath() + "\"}"); - } catch (IOException e) { - return Option.empty(); - } - } - - public static List convertToStringList(List records) { - return records.stream().map(DataSourceTestUtils::convertToString).filter(Option::isPresent).map(Option::get) - .collect(Collectors.toList()); - } -} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java index 920f1ed..f624247 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java @@ -18,205 +18,171 @@ package org.apache.hudi.utilities; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.common.HoodieCommonTestHarness; +import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.common.HoodieTestDataGenerator; -import org.apache.hudi.common.model.HoodieTestUtils; -import org.apache.hudi.common.util.FSUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.utilities.HoodieSnapshotExporter.Config; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.SparkSession; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; -import java.io.File; import java.io.IOException; -import java.util.HashMap; +import java.util.Arrays; import java.util.List; -import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -public class TestHoodieSnapshotExporter extends HoodieCommonTestHarness { - private static String TEST_WRITE_TOKEN = "1-0-1"; - - private SparkSession spark = null; - private HoodieTestDataGenerator dataGen = null; - private String outputPath = null; - private String rootPath = null; - private FileSystem fs = null; - private Map commonOpts; - private HoodieSnapshotExporter.Config cfg; - private JavaSparkContext jsc = null; - - @Before - public void initialize() throws IOException { - spark = SparkSession.builder() - .appName("Hoodie Datasource test") - .master("local[2]") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .getOrCreate(); - jsc = new JavaSparkContext(spark.sparkContext()); - dataGen = new HoodieTestDataGenerator(); - folder.create(); - basePath = folder.getRoot().getAbsolutePath(); - fs = FSUtils.getFs(basePath, spark.sparkContext().hadoopConfiguration()); - commonOpts = new HashMap(); - - commonOpts.put("hoodie.insert.shuffle.parallelism", "4"); - commonOpts.put("hoodie.upsert.shuffle.parallelism", "4"); - commonOpts.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key"); - commonOpts.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition"); - commonOpts.put(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp"); - commonOpts.put(HoodieWriteConfig.TABLE_NAME, "hoodie_test"); - - - cfg = new HoodieSnapshotExporter.Config(); - - cfg.sourceBasePath = basePath; - cfg.targetOutputPath = outputPath = basePath + "/target"; - cfg.outputFormat = "json"; - cfg.outputPartitionField = "partition"; +@RunWith(Enclosed.class) +public class TestHoodieSnapshotExporter { + + static class ExporterTestHarness extends HoodieClientTestHarness { + + static final Logger LOG = LogManager.getLogger(ExporterTestHarness.class); + static final int NUM_RECORDS = 100; + static final String COMMIT_TIME = "20200101000000"; + static final String PARTITION_PATH = "2020/01/01"; + static final String TABLE_NAME = "testing"; + String sourcePath; + String targetPath; + + @Before + public void setUp() throws Exception { + initSparkContexts(); + initDFS(); + dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH}); + + // Initialize test data dirs + sourcePath = dfsBasePath + "/source/"; + targetPath = dfsBasePath + "/target/"; + dfs.mkdirs(new Path(sourcePath)); + dfs.mkdirs(new Path(targetPath)); + HoodieTableMetaClient + .initTableType(jsc.hadoopConfiguration(), sourcePath, HoodieTableType.COPY_ON_WRITE, TABLE_NAME, + HoodieAvroPayload.class.getName()); + + // Prepare data as source Hudi dataset + HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath); + HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc, cfg); + hdfsWriteClient.startCommitWithTime(COMMIT_TIME); + List records = dataGen.generateInserts(COMMIT_TIME, NUM_RECORDS); + JavaRDD recordsRDD = jsc.parallelize(records, 1); + hdfsWriteClient.bulkInsert(recordsRDD, COMMIT_TIME); + hdfsWriteClient.close(); + + RemoteIterator itr = dfs.listFiles(new Path(sourcePath), true); + while (itr.hasNext()) { + LOG.info(">>> Prepared test file: " + itr.next().getPath()); + } + } - } + @After + public void tearDown() throws Exception { + cleanupSparkContexts(); + cleanupDFS(); + cleanupTestDataGenerator(); + } - @After - public void cleanup() { - if (spark != null) { - spark.stop(); + private HoodieWriteConfig getHoodieWriteConfig(String basePath) { + return HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withEmbeddedTimelineServerEnabled(false) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withBulkInsertParallelism(2) + .forTable(TABLE_NAME) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build()) + .build(); } } - @Test - public void testSnapshotExporter() throws IOException { - // Insert Operation - List records = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100)); - Dataset inputDF = spark.read().json(new JavaSparkContext(spark.sparkContext()).parallelize(records, 2)); - inputDF.write().format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL()) - .mode(SaveMode.Overwrite) - .save(basePath); - long sourceCount = inputDF.count(); - - HoodieSnapshotExporter hoodieSnapshotExporter = new HoodieSnapshotExporter(); - hoodieSnapshotExporter.export(spark, cfg); - - long targetCount = spark.read().json(outputPath).count(); - - assertTrue(sourceCount == targetCount); - - // Test Invalid OutputFormat - cfg.outputFormat = "foo"; - int isError = hoodieSnapshotExporter.export(spark, cfg); - assertTrue(isError == -1); - } + public static class TestHoodieSnapshotExporterForHudi extends ExporterTestHarness { + + @Test + public void testExportAsHudi() throws IOException { + HoodieSnapshotExporter.Config cfg = new Config(); + cfg.sourceBasePath = sourcePath; + cfg.targetOutputPath = targetPath; + cfg.outputFormat = "hudi"; + new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg); + + // Check results + assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean"))); + assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean.inflight"))); + assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".clean.requested"))); + assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit"))); + assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".commit.requested"))); + assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + ".inflight"))); + assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/hoodie.properties"))); + String partition = targetPath + "/" + PARTITION_PATH; + long numParquetFiles = Arrays.stream(dfs.listStatus(new Path(partition))) + .filter(fileStatus -> fileStatus.getPath().toString().endsWith(".parquet")) + .count(); + assertTrue("There should exist at least 1 parquet file.", numParquetFiles >= 1); + assertEquals(NUM_RECORDS, sqlContext.read().parquet(partition).count()); + assertTrue(dfs.exists(new Path(partition + "/.hoodie_partition_metadata"))); + assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS"))); + } - // for testEmptySnapshotCopy - public void init() throws IOException { - TemporaryFolder folder = new TemporaryFolder(); - folder.create(); - rootPath = "file://" + folder.getRoot().getAbsolutePath(); - basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME; - outputPath = rootPath + "/output"; - - final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); - fs = FSUtils.getFs(basePath, hadoopConf); - HoodieTestUtils.init(hadoopConf, basePath); + @Test + public void testExportEmptyDataset() throws IOException { + // delete all source data + dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true); + + // export + HoodieSnapshotExporter.Config cfg = new Config(); + cfg.sourceBasePath = sourcePath; + cfg.targetOutputPath = targetPath; + cfg.outputFormat = "hudi"; + new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg); + + // Check results + assertEquals("Target path should be empty.", 0, dfs.listStatus(new Path(targetPath)).length); + assertFalse(dfs.exists(new Path(targetPath + "/_SUCCESS"))); + } } - @Test - public void testEmptySnapshotCopy() throws IOException { - init(); - // There is no real data (only .hoodie directory) - assertEquals(fs.listStatus(new Path(basePath)).length, 1); - assertFalse(fs.exists(new Path(outputPath))); - - // Do the snapshot - HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); - copier.snapshot(jsc, basePath, outputPath, true); + @RunWith(Parameterized.class) + public static class TestHoodieSnapshotExporterForNonHudi extends ExporterTestHarness { - // Nothing changed; we just bail out - assertEquals(fs.listStatus(new Path(basePath)).length, 1); - assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS"))); - } + @Parameters + public static Iterable formats() { + return Arrays.asList(new String[][] {{"json"}, {"parquet"}}); + } - // TODO - uncomment this after fixing test failures - // @Test - public void testSnapshotCopy() throws Exception { - // Generate some commits and corresponding parquets - String commitTime1 = "20160501010101"; - String commitTime2 = "20160502020601"; - String commitTime3 = "20160506030611"; - new File(basePath + "/.hoodie").mkdirs(); - new File(basePath + "/.hoodie/hoodie.properties").createNewFile(); - // Only first two have commit files - new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile(); - new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile(); - new File(basePath + "/.hoodie/" + commitTime3 + ".inflight").createNewFile(); - - // Some parquet files - new File(basePath + "/2016/05/01/").mkdirs(); - new File(basePath + "/2016/05/02/").mkdirs(); - new File(basePath + "/2016/05/06/").mkdirs(); - HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{"2016/05/01", "2016/05/02", "2016/05/06"}, - basePath); - // Make commit1 - File file11 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11")); - file11.createNewFile(); - File file12 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id12")); - file12.createNewFile(); - File file13 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id13")); - file13.createNewFile(); - - // Make commit2 - File file21 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id21")); - file21.createNewFile(); - File file22 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id22")); - file22.createNewFile(); - File file23 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id23")); - file23.createNewFile(); - - // Make commit3 - File file31 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id31")); - file31.createNewFile(); - File file32 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id32")); - file32.createNewFile(); - File file33 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id33")); - file33.createNewFile(); - - // Do a snapshot copy - HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); - copier.snapshot(jsc, basePath, outputPath, false); - - // Check results - assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName()))); - assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file12.getName()))); - assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file13.getName()))); - assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file21.getName()))); - assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file22.getName()))); - assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file23.getName()))); - assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" + file31.getName()))); - assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" + file32.getName()))); - assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" + file33.getName()))); - - assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 + ".commit"))); - assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 + ".commit"))); - assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".commit"))); - assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".inflight"))); - assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties"))); - - assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS"))); + @Parameter + public String format; + + @Test + public void testExportAsNonHudi() throws IOException { + HoodieSnapshotExporter.Config cfg = new Config(); + cfg.sourceBasePath = sourcePath; + cfg.targetOutputPath = targetPath; + cfg.outputFormat = format; + new HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(), cfg); + assertEquals(NUM_RECORDS, sqlContext.read().format(format).load(targetPath).count()); + assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS"))); + } } }