From commits-return-11671-archive-asf-public=cust-asf.ponee.io@hudi.apache.org Sun Feb 16 18:16:39 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 63AC0180181 for ; Sun, 16 Feb 2020 19:16:39 +0100 (CET) Received: (qmail 14410 invoked by uid 500); 16 Feb 2020 18:16:38 -0000 Mailing-List: contact commits-help@hudi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hudi.apache.org Delivered-To: mailing list commits@hudi.apache.org Received: (qmail 14401 invoked by uid 99); 16 Feb 2020 18:16:38 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 16 Feb 2020 18:16:38 +0000 From: GitBox To: commits@hudi.apache.org Subject: [GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1165: [HUDI-76] Add CSV Source support for Hudi Delta Streamer Message-ID: <158187699870.20023.4993996502526871540.gitbox@gitbox.apache.org> References: In-Reply-To: Date: Sun, 16 Feb 2020 18:16:38 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit pratyakshsharma commented on a change in pull request #1165: [HUDI-76] Add CSV Source support for Hudi Delta Streamer URL: https://github.com/apache/incubator-hudi/pull/1165#discussion_r379922401 ########## File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java ########## @@ -692,6 +698,146 @@ public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName()); } + private void prepareCsvDFSSource( + boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException { + String sourceRoot = dfsBasePath + "/csvFiles"; + String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c0"; + + // Properties used for testing delta-streamer with CSV source + TypedProperties csvProps = new TypedProperties(); + csvProps.setProperty("include", "base.properties"); + csvProps.setProperty("hoodie.datasource.write.recordkey.field", recordKeyField); + csvProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + if (useSchemaProvider) { + csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source-flattened.avsc"); + if (hasTransformer) { + csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target-flattened.avsc"); + } + } + csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot); + + if (sep != ',') { + if (sep == '\t') { + csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t"); + } else { + csvProps.setProperty("hoodie.deltastreamer.csv.sep", Character.toString(sep)); + } + } + if (hasHeader) { + csvProps.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(hasHeader)); + } + + UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_CSV); + + String path = sourceRoot + "/1.csv"; + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + UtilitiesTestBase.Helpers.saveCsvToDFS( + hasHeader, sep, + Helpers.jsonifyRecords(dataGenerator.generateInserts("000", CSV_NUM_RECORDS, true)), + dfs, path); + } + + private void testCsvDFSSource( + boolean hasHeader, char sep, boolean useSchemaProvider, String transformerClassName) throws Exception { + prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassName != null); + String tableBasePath = dfsBasePath + "/test_csv_table" + testNum; + String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0"; + HoodieDeltaStreamer deltaStreamer = + new HoodieDeltaStreamer(TestHelpers.makeConfig( + tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(), + transformerClassName, PROPS_FILENAME_TEST_CSV, false, + useSchemaProvider, 1000, false, null, null, sourceOrderingField), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); + testNum++; + } + + @Test + public void testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws Exception { + // The CSV files have header, the columns are separated by ',', the default separator + // No schema provider is specified, no transformer is applied + // In this case, the source schema comes from the inferred schema of the CSV files + testCsvDFSSource(true, ',', false, null); + } + + @Test + public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws Exception { + // The CSV files have header, the columns are separated by '\t', + // which is passed in through the Hudi CSV properties + // No schema provider is specified, no transformer is applied + // In this case, the source schema comes from the inferred schema of the CSV files + testCsvDFSSource(true, '\t', false, null); + } + + @Test + public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws Exception { + // The CSV files have header, the columns are separated by '\t' + // File schema provider is used, no transformer is applied + // In this case, the source schema comes from the source Arvo schema file + testCsvDFSSource(true, '\t', true, null); + } + + @Test + public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer() throws Exception { + // The CSV files have header, the columns are separated by '\t' + // No schema provider is specified, transformer is applied + // In this case, the source schema comes from the inferred schema of the CSV files. + // Target schema is determined based on the Dataframe after transformation + testCsvDFSSource(true, '\t', false, TripsWithDistanceTransformer.class.getName()); + } + + @Test + public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception { + // The CSV files have header, the columns are separated by '\t' + // File schema provider is used, no transformer is applied + // In this case, the source and target schema come from the Arvo schema files + testCsvDFSSource(true, '\t', true, TripsWithDistanceTransformer.class.getName()); + } + + @Test + public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndNoTransformer() throws Exception { + // The CSV files do not have header, the columns are separated by '\t', + // which is passed in through the Hudi CSV properties + // No schema provider is specified, no transformer is applied + // In this case, the source schema comes from the inferred schema of the CSV files + // No CSV header and no schema provider at the same time are not recommended + // as the column names are not informative + testCsvDFSSource(false, '\t', false, null); + } + + @Test + public void testCsvDFSSourceNoHeaderWithSchemaProviderAndNoTransformer() throws Exception { + // The CSV files do not have header, the columns are separated by '\t' + // File schema provider is used, no transformer is applied + // In this case, the source schema comes from the source Arvo schema file Review comment: ditto ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services