hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [incubator-hudi] yihua commented on issue #1165: [HUDI-76] Add CSV Source support for Hudi Delta Streamer
Date Wed, 11 Mar 2020 05:58:43 GMT
yihua commented on issue #1165: [HUDI-76] Add CSV Source support for Hudi Delta Streamer
URL: https://github.com/apache/incubator-hudi/pull/1165#issuecomment-597457998
 
 
   > One question about using nested schema. Can you remind me what happens if someone
passes in a nested schema for CsvDeltaStreamer?
   
   I used some code below to test the nested schema for CSV reader in Spark.  It throws the
following exception, which means that Spark CSV source does not support nested schema currently.
   
   In most cases, the CSV schemas should be flattened.  It depends on Spark's behavior whether
nested schema is supported for CSV source (in the future nested schema may be supported for
CSV).  So we don't enforce the check in our Hudi code. 
   
   ```
   org.apache.spark.sql.AnalysisException: CSV data source does not support struct<amount:double,currency:string>
data type.;
   
   	at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:69)
   	at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:67)
   	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
   	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
   	at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
   	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:67)
   	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifyReadSchema(DataSourceUtils.scala:41)
   	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:400)
   	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
   	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
   	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:188)
   	at org.apache.hudi.utilities.sources.CsvDFSSource.fromFiles(CsvDFSSource.java:120)
   	at org.apache.hudi.utilities.sources.CsvDFSSource.fetchNextBatch(CsvDFSSource.java:93)
   	at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43)
   	at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:73)
   	at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:66)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:317)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:226)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:121)
   	at org.apache.hudi.utilities.TestHoodieDeltaStreamer.testCsvDFSSourceWithNestedSchema(TestHoodieDeltaStreamer.java:812)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
   	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
   	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
   	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
   	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
   	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
   	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
   	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
   	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
   	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
   	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
   	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
   	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
   	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
   	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
   	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
   	at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
   	at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
   	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
   	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
   	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
   	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
   ```
   
   Appendix: a simple diff for testing nested CSV schema:
   ```
   diff --git a/hudi-utilities/null/parquetFiles/.1.parquet.crc b/hudi-utilities/null/parquetFiles/.1.parquet.crc
   new file mode 100644
   index 00000000..f48941c4
   Binary files /dev/null and b/hudi-utilities/null/parquetFiles/.1.parquet.crc differ
   diff --git a/hudi-utilities/null/parquetFiles/1.parquet b/hudi-utilities/null/parquetFiles/1.parquet
   new file mode 100644
   index 00000000..7780cb89
   Binary files /dev/null and b/hudi-utilities/null/parquetFiles/1.parquet differ
   diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
   index 4b69d223..e2921a5f 100644
   --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
   +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
   @@ -21,7 +21,6 @@ package org.apache.hudi.utilities.deltastreamer;
    import org.apache.hudi.AvroConversionUtils;
    import org.apache.hudi.DataSourceUtils;
    import org.apache.hudi.client.HoodieWriteClient;
   -import org.apache.hudi.keygen.KeyGenerator;
    import org.apache.hudi.client.WriteStatus;
    import org.apache.hudi.common.model.HoodieCommitMetadata;
    import org.apache.hudi.common.model.HoodieRecord;
   @@ -40,6 +39,7 @@ import org.apache.hudi.exception.HoodieException;
    import org.apache.hudi.hive.HiveSyncConfig;
    import org.apache.hudi.hive.HiveSyncTool;
    import org.apache.hudi.index.HoodieIndex;
   +import org.apache.hudi.keygen.KeyGenerator;
    import org.apache.hudi.utilities.UtilHelpers;
    import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
    import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
   @@ -332,6 +332,7 @@ public class DeltaSync implements Serializable {
        }
    
        JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
   +    List<GenericRecord> r = avroRDD.collect();
        JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
          HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName,
gr,
              (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField,
false));
   diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
   index 9e289f10..6f0cc8f8 100644
   --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
   +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java
   @@ -41,6 +41,7 @@ public abstract class RowSource extends Source<Dataset<Row>>
{
      @Override
      protected final InputBatch<Dataset<Row>> fetchNewData(Option<String>
lastCkptStr, long sourceLimit) {
        Pair<Option<Dataset<Row>>, String> res = fetchNextBatch(lastCkptStr,
sourceLimit);
   +    Row[] x = (Row[]) res.getKey().get().collect();
        return res.getKey().map(dsr -> {
          SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(dsr.schema());
          return new InputBatch<>(res.getKey(), res.getValue(), rowSchemaProvider);
   diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
   index 43f76904..46761703 100644
   --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
   +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
   @@ -19,7 +19,6 @@
    package org.apache.hudi.utilities;
    
    import org.apache.hudi.DataSourceWriteOptions;
   -import org.apache.hudi.keygen.SimpleKeyGenerator;
    import org.apache.hudi.common.HoodieTestDataGenerator;
    import org.apache.hudi.common.model.HoodieCommitMetadata;
    import org.apache.hudi.common.model.HoodieTableType;
   @@ -33,11 +32,12 @@ import org.apache.hudi.common.util.FSUtils;
    import org.apache.hudi.common.util.Option;
    import org.apache.hudi.common.util.TypedProperties;
    import org.apache.hudi.config.HoodieCompactionConfig;
   -import org.apache.hudi.exception.TableNotFoundException;
    import org.apache.hudi.exception.HoodieException;
   +import org.apache.hudi.exception.TableNotFoundException;
    import org.apache.hudi.hive.HiveSyncConfig;
    import org.apache.hudi.hive.HoodieHiveClient;
    import org.apache.hudi.hive.MultiPartKeysValueExtractor;
   +import org.apache.hudi.keygen.SimpleKeyGenerator;
    import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
    import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
    import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
   @@ -101,6 +101,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
      private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
      private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
      private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
   +  private static final String PROPS_FILENAME_TEST_CSV_NESTED = "test-csv-dfs-source-nested.properties";
      private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
      private static final String PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
      private static final int PARQUET_NUM_RECORDS = 5;
   @@ -728,7 +729,51 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
          csvProps.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(hasHeader));
        }
    
   -    UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_CSV);
   +    UtilitiesTestBase.Helpers
   +        .savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_CSV);
   +
   +    String path = sourceRoot + "/1.csv";
   +    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
   +    UtilitiesTestBase.Helpers.saveCsvToDFS(
   +        hasHeader, sep,
   +        Helpers.jsonifyRecords(dataGenerator.generateInserts("000", CSV_NUM_RECORDS, true)),
   +        dfs, path);
   +  }
   +
   +  private void prepareCsvDFSSourceNested(
   +      boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer)
   +      throws IOException {
   +    String sourceRoot = dfsBasePath + "/csvFiles";
   +    String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c0";
   +
   +    // Properties used for testing delta-streamer with CSV source
   +    TypedProperties csvProps = new TypedProperties();
   +    csvProps.setProperty("include", "base.properties");
   +    csvProps.setProperty("hoodie.datasource.write.recordkey.field", recordKeyField);
   +    csvProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
   +    if (useSchemaProvider) {
   +      csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
   +          dfsBasePath + "/source.avsc");
   +      if (hasTransformer) {
   +        csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
   +            dfsBasePath + "/target.avsc");
   +      }
   +    }
   +    csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
   +
   +    if (sep != ',') {
   +      if (sep == '\t') {
   +        csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
   +      } else {
   +        csvProps.setProperty("hoodie.deltastreamer.csv.sep", Character.toString(sep));
   +      }
   +    }
   +    if (hasHeader) {
   +      csvProps.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(hasHeader));
   +    }
   +
   +    UtilitiesTestBase.Helpers
   +        .savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_CSV_NESTED);
    
        String path = sourceRoot + "/1.csv";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
   @@ -739,7 +784,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
      }
    
      private void testCsvDFSSource(
   -      boolean hasHeader, char sep, boolean useSchemaProvider, String transformerClassName)
throws Exception {
   +      boolean hasHeader, char sep, boolean useSchemaProvider, String transformerClassName)
   +      throws Exception {
        prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassName != null);
        String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
        String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0";
   @@ -753,6 +799,25 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
        testNum++;
      }
    
   +  @Test
   +  public void testCsvDFSSourceWithNestedSchema() throws Exception {
   +    prepareCsvDFSSourceNested(true, ',', true, false);
   +    String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
   +    String sourceOrderingField = "timestamp";
   +    HoodieDeltaStreamer deltaStreamer =
   +        new HoodieDeltaStreamer(TestHelpers.makeConfig(
   +            tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(),
   +            null, PROPS_FILENAME_TEST_CSV_NESTED, false,
   +            true, 1000, false, null, null, sourceOrderingField), jsc);
   +    deltaStreamer.sync();
   +
   +    Row[] x = (Row[]) sqlContext.read().format("org.apache.hudi")
   +        .load(tableBasePath + "/*/*.parquet")
   +        .collect();
   +    TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
   +    testNum++;
   +  }
   +
      @Test
      public void testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws
Exception {
        // The CSV files have header, the columns are separated by ',', the default separator
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message