hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sivaba...@apache.org
Subject [incubator-hudi] branch master updated: [HUDI-76] Add CSV Source support for Hudi Delta Streamer
Date Thu, 19 Mar 2020 14:01:02 GMT
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cf765df  [HUDI-76] Add CSV Source support for Hudi Delta Streamer
     new a752b7b  Merge pull request #1165 from yihua/HUDI-76-deltastreamer-csv-source
cf765df is described below

commit cf765df6062f896ad6bf0a8ddf711aad19dbf59b
Author: Y Ethan Guo <guoyihua@uber.com>
AuthorDate: Sat Jan 18 23:11:14 2020 -0800

    [HUDI-76] Add CSV Source support for Hudi Delta Streamer
---
 .../hudi/common/HoodieTestDataGenerator.java       |  92 +++++++++---
 hudi-utilities/pom.xml                             |   5 +
 .../hudi/utilities/sources/CsvDFSSource.java       | 126 ++++++++++++++++
 .../hudi/utilities/TestHoodieDeltaStreamer.java    | 164 +++++++++++++++++++--
 .../apache/hudi/utilities/UtilitiesTestBase.java   |  60 +++++++-
 .../utilities/sources/AbstractBaseTestSource.java  |   2 +-
 .../sources/AbstractDFSSourceTestBase.java         |   3 +-
 .../hudi/utilities/sources/TestCsvDFSSource.java   |  61 ++++++++
 .../delta-streamer-config/source-flattened.avsc    |  57 +++++++
 .../delta-streamer-config/target-flattened.avsc    |  60 ++++++++
 10 files changed, 597 insertions(+), 33 deletions(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index 6d86e93..3307881 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -74,20 +74,30 @@ public class HoodieTestDataGenerator {
   public static final String[] DEFAULT_PARTITION_PATHS =
       {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
   public static final int DEFAULT_PARTITION_DEPTH = 3;
-  public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\":
\"triprec\"," + "\"fields\": [ "
+  public static final String TRIP_SCHEMA_PREFIX = "{\"type\": \"record\"," + "\"name\": \"triprec\","
+ "\"fields\": [ "
       + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\":
\"string\"},"
       + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\":
\"string\"},"
       + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\":
\"double\"},"
-      + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\":
\"double\"},"
-      + "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\":
["
-      + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},"
-      + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
+      + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\":
\"double\"},";
+  public static final String TRIP_SCHEMA_SUFFIX = "{\"name\": \"_hoodie_is_deleted\", \"type\":
\"boolean\", \"default\": false} ]}";
+  public static final String FARE_NESTED_SCHEMA = "{\"name\": \"fare\",\"type\": {\"type\":\"record\",
\"name\":\"fare\",\"fields\": ["
+      + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},";
+  public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\", \"type\": \"double\"},"
+      + "{\"name\": \"currency\", \"type\": \"string\"},";
+
+  public static final String TRIP_EXAMPLE_SCHEMA =
+      TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+  public static final String TRIP_FLATTENED_SCHEMA =
+      TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+
   public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
   public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
                                                   + "struct<amount:double,currency:string>,boolean";
+
   public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
   public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
       HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
+  public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
 
   private static final Random RAND = new Random(46474747);
 
@@ -115,10 +125,33 @@ public class HoodieTestDataGenerator {
   }
 
   /**
-   * Generates a new avro record of the above schema format, retaining the key if optionally
provided.
+   * Generates a new avro record of the above nested schema format,
+   * retaining the key if optionally provided.
+   *
+   * @param key  Hoodie key.
+   * @param commitTime  Commit time to use.
+   * @return  Raw paylaod of a test record.
+   * @throws IOException
    */
   public static TestRawTripPayload generateRandomValue(HoodieKey key, String commitTime)
throws IOException {
-    GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime,
"driver-" + commitTime, 0.0);
+    return generateRandomValue(key, commitTime, false);
+  }
+
+  /**
+   * Generates a new avro record with the specified schema (nested or flattened),
+   * retaining the key if optionally provided.
+   *
+   * @param key  Hoodie key.
+   * @param commitTime  Commit time to use.
+   * @param isFlattened  whether the schema of the record should be flattened.
+   * @return  Raw paylaod of a test record.
+   * @throws IOException
+   */
+  public static TestRawTripPayload generateRandomValue(
+      HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
+    GenericRecord rec = generateGenericRecord(
+        key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0,
+        false, isFlattened);
     return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(),
TRIP_EXAMPLE_SCHEMA);
   }
 
@@ -127,7 +160,7 @@ public class HoodieTestDataGenerator {
    */
   public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key, String commitTime)
throws IOException {
     GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime,
"driver-" + commitTime, 0.0,
-        true);
+        true, false);
     return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(),
TRIP_EXAMPLE_SCHEMA);
   }
 
@@ -141,12 +174,13 @@ public class HoodieTestDataGenerator {
 
   public static GenericRecord generateGenericRecord(String rowKey, String riderName, String
driverName,
                                                     double timestamp) {
-    return generateGenericRecord(rowKey, riderName, driverName, timestamp, false);
+    return generateGenericRecord(rowKey, riderName, driverName, timestamp, false, false);
   }
 
   public static GenericRecord generateGenericRecord(String rowKey, String riderName, String
driverName,
-                                                    double timestamp, boolean isDeleteRecord)
{
-    GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
+                                                    double timestamp, boolean isDeleteRecord,
+                                                    boolean isFlattened) {
+    GenericRecord rec = new GenericData.Record(isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
     rec.put("_row_key", rowKey);
     rec.put("timestamp", timestamp);
     rec.put("rider", riderName);
@@ -156,10 +190,15 @@ public class HoodieTestDataGenerator {
     rec.put("end_lat", RAND.nextDouble());
     rec.put("end_lon", RAND.nextDouble());
 
-    GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
-    fareRecord.put("amount", RAND.nextDouble() * 100);
-    fareRecord.put("currency", "USD");
-    rec.put("fare", fareRecord);
+    if (isFlattened) {
+      rec.put("fare", RAND.nextDouble() * 100);
+      rec.put("currency", "USD");
+    } else {
+      GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
+      fareRecord.put("amount", RAND.nextDouble() * 100);
+      fareRecord.put("currency", "USD");
+      rec.put("fare", fareRecord);
+    }
 
     if (isDeleteRecord) {
       rec.put("_hoodie_is_deleted", true);
@@ -230,16 +269,31 @@ public class HoodieTestDataGenerator {
   }
 
   /**
-   * Generates new inserts, uniformly across the partition paths above. It also updates the
list of existing keys.
+   * Generates new inserts with nested schema, uniformly across the partition paths above.
+   * It also updates the list of existing keys.
    */
   public List<HoodieRecord> generateInserts(String commitTime, Integer n) {
-    return generateInsertsStream(commitTime, n).collect(Collectors.toList());
+    return generateInserts(commitTime, n, false);
+  }
+
+  /**
+   * Generates new inserts, uniformly across the partition paths above.
+   * It also updates the list of existing keys.
+   *
+   * @param commitTime  Commit time to use.
+   * @param n  Number of records.
+   * @param isFlattened  whether the schema of the generated record is flattened
+   * @return  List of {@link HoodieRecord}s
+   */
+  public List<HoodieRecord> generateInserts(String commitTime, Integer n, boolean isFlattened)
{
+    return generateInsertsStream(commitTime, n, isFlattened).collect(Collectors.toList());
   }
 
   /**
    * Generates new inserts, uniformly across the partition paths above. It also updates the
list of existing keys.
    */
-  public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer n) {
+  public Stream<HoodieRecord> generateInsertsStream(
+      String commitTime, Integer n, boolean isFlattened) {
     int currSize = getNumExistingKeys();
 
     return IntStream.range(0, n).boxed().map(i -> {
@@ -251,7 +305,7 @@ public class HoodieTestDataGenerator {
       existingKeys.put(currSize + i, kp);
       numExistingKeys++;
       try {
-        return new HoodieRecord(key, generateRandomValue(key, commitTime));
+        return new HoodieRecord(key, generateRandomValue(key, commitTime, isFlattened));
       } catch (IOException e) {
         throw new HoodieIOException(e.getMessage(), e);
       }
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index f82243b..2324ae8 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -137,6 +137,11 @@
       <groupId>com.fasterxml.jackson.module</groupId>
       <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-csv</artifactId>
+      <version>${fasterxml.version}</version>
+    </dependency>
 
     <!-- Parquet -->
     <dependency>
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
new file mode 100644
index 0000000..b8ccd6e
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.avro.SchemaConverters;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Reads data from CSV files on DFS as the data source.
+ *
+ * Internally, we use Spark to read CSV files thus any limitation of Spark CSV also applies
here
+ * (e.g., limited support for nested schema).
+ *
+ * You can set the CSV-specific configs in the format of hoodie.deltastreamer.csv.*
+ * that are Spark compatible to deal with CSV files in Hudi.  The supported options are:
+ *
+ *       "sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", "comment",
+ *       "header", "enforceSchema", "inferSchema", "samplingRatio", "ignoreLeadingWhiteSpace",
+ *       "ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", "positiveInf",
+ *       "negativeInf", "dateFormat", "timestampFormat", "maxColumns", "maxCharsPerColumn",
+ *       "mode", "columnNameOfCorruptRecord", "multiLine"
+ *
+ * Detailed information of these CSV options can be found at:
+ * https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html#csv-scala.collection.Seq-
+ *
+ * If the source Avro schema is provided through the {@link org.apache.hudi.utilities.schema.FilebasedSchemaProvider}
+ * using "hoodie.deltastreamer.schemaprovider.source.schema.file" config, the schema is
+ * passed to the CSV reader without inferring the schema from the CSV file.
+ */
+public class CsvDFSSource extends RowSource {
+  // CsvSource config prefix
+  public static final String CSV_SRC_CONFIG_PREFIX = "hoodie.deltastreamer.csv.";
+  // CSV-specific configurations to pass in from Hudi to Spark
+  public static final List<String> CSV_CONFIG_KEYS = Arrays.asList(
+      "sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", "comment",
+      "header", "enforceSchema", "inferSchema", "samplingRatio", "ignoreLeadingWhiteSpace",
+      "ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", "positiveInf",
+      "negativeInf", "dateFormat", "timestampFormat", "maxColumns", "maxCharsPerColumn",
+      "mode", "columnNameOfCorruptRecord", "multiLine"
+  );
+
+  private final DFSPathSelector pathSelector;
+  private final StructType sourceSchema;
+
+  public CsvDFSSource(TypedProperties props,
+      JavaSparkContext sparkContext,
+      SparkSession sparkSession,
+      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+    this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration());
+    if (schemaProvider != null) {
+      sourceSchema = (StructType) SchemaConverters.toSqlType(schemaProvider.getSourceSchema())
+          .dataType();
+    } else {
+      sourceSchema = null;
+    }
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr,
+      long sourceLimit) {
+    Pair<Option<String>, String> selPathsWithMaxModificationTime =
+        pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
+    return Pair.of(fromFiles(
+        selPathsWithMaxModificationTime.getLeft()), selPathsWithMaxModificationTime.getRight());
+  }
+
+  /**
+   * Reads the CSV files and parsed the lines into {@link Dataset} of {@link Row}.
+   *
+   * @param pathStr  The list of file paths, separated by ','.
+   * @return  {@link Dataset} of {@link Row} containing the records.
+   */
+  private Option<Dataset<Row>> fromFiles(Option<String> pathStr) {
+    if (pathStr.isPresent()) {
+      DataFrameReader dataFrameReader = sparkSession.read().format("csv");
+      CSV_CONFIG_KEYS.forEach(optionKey -> {
+        String configPropName = CSV_SRC_CONFIG_PREFIX + optionKey;
+        String value  = props.getString(configPropName, null);
+        // Pass down the Hudi CSV configs to Spark DataFrameReader
+        if (value != null) {
+          dataFrameReader.option(optionKey, value);
+        }
+      });
+      if (sourceSchema != null) {
+        // Source schema is specified, pass it to the reader
+        dataFrameReader.schema(sourceSchema);
+      }
+      dataFrameReader.option("inferSchema", Boolean.toString(sourceSchema == null));
+
+      return Option.of(dataFrameReader.load(pathStr.get().split(",")));
+    } else {
+      return Option.empty();
+    }
+  }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index 100faa2..9224be0 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -19,8 +19,8 @@
 package org.apache.hudi.utilities;
 
 import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -42,6 +42,7 @@ import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.CsvDFSSource;
 import org.apache.hudi.utilities.sources.DistributedTestDataSource;
 import org.apache.hudi.utilities.sources.HoodieIncrSource;
 import org.apache.hudi.utilities.sources.InputBatch;
@@ -60,6 +61,7 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
@@ -98,12 +100,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
   private static final Random RANDOM = new Random();
   private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
   private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
+  private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
   private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
   private static final String PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
   private static final int PARQUET_NUM_RECORDS = 5;
+  private static final int CSV_NUM_RECORDS = 3;
   private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
 
-  private static int parquetTestNum = 1;
+  private static int testNum = 1;
 
   @BeforeClass
   public static void initClass() throws Exception {
@@ -114,7 +118,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
     UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
dfs,
         dfsBasePath + "/sql-transformer.properties");
     UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath
+ "/source.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs,
dfsBasePath + "/source-flattened.avsc");
     UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath
+ "/target.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs,
dfsBasePath + "/target-flattened.avsc");
 
     TypedProperties props = new TypedProperties();
     props.setProperty("include", "sql-transformer.properties");
@@ -197,12 +203,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
         String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean
updatePayloadClass,
                                                  String payloadClassName, String tableType)
{
       return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassName,
propsFilename, enableHiveSync,
-          useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType);
+          useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType,
"timestamp");
     }
 
     static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName,
         String transformerClassName, String propsFilename, boolean enableHiveSync, boolean
useSchemaProviderClass,
-        int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType)
{
+        int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType,
String sourceOrderingField) {
       HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
       cfg.targetBasePath = basePath;
       cfg.targetTableName = "hoodie_trips";
@@ -211,7 +217,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
       cfg.transformerClassName = transformerClassName;
       cfg.operation = op;
       cfg.enableHiveSync = enableHiveSync;
-      cfg.sourceOrderingField = "timestamp";
+      cfg.sourceOrderingField = sourceOrderingField;
       cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
       cfg.sourceLimit = sourceLimit;
       if (updatePayloadClass) {
@@ -653,7 +659,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
     if (useSchemaProvider) {
       parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
       if (hasTransformer) {
-        parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/target.avsc");
+        parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
       }
     }
     parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);
@@ -663,14 +669,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
 
   private void testParquetDFSSource(boolean useSchemaProvider, String transformerClassName)
throws Exception {
     prepareParquetDFSSource(useSchemaProvider, transformerClassName != null);
-    String tableBasePath = dfsBasePath + "/test_parquet_table" + parquetTestNum;
+    String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(),
             transformerClassName, PROPS_FILENAME_TEST_PARQUET, false,
-            useSchemaProvider, 100000, false, null, null), jsc);
+            useSchemaProvider, 100000, false, null, null, "timestamp"), jsc);
     deltaStreamer.sync();
     TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
-    parquetTestNum++;
+    testNum++;
   }
 
   @Test
@@ -693,6 +699,146 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
     testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
   }
 
+  private void prepareCsvDFSSource(
+      boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws
IOException {
+    String sourceRoot = dfsBasePath + "/csvFiles";
+    String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c0";
+
+    // Properties used for testing delta-streamer with CSV source
+    TypedProperties csvProps = new TypedProperties();
+    csvProps.setProperty("include", "base.properties");
+    csvProps.setProperty("hoodie.datasource.write.recordkey.field", recordKeyField);
+    csvProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
+    if (useSchemaProvider) {
+      csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath
+ "/source-flattened.avsc");
+      if (hasTransformer) {
+        csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath
+ "/target-flattened.avsc");
+      }
+    }
+    csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
+
+    if (sep != ',') {
+      if (sep == '\t') {
+        csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
+      } else {
+        csvProps.setProperty("hoodie.deltastreamer.csv.sep", Character.toString(sep));
+      }
+    }
+    if (hasHeader) {
+      csvProps.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(hasHeader));
+    }
+
+    UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_CSV);
+
+    String path = sourceRoot + "/1.csv";
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    UtilitiesTestBase.Helpers.saveCsvToDFS(
+        hasHeader, sep,
+        Helpers.jsonifyRecords(dataGenerator.generateInserts("000", CSV_NUM_RECORDS, true)),
+        dfs, path);
+  }
+
+  private void testCsvDFSSource(
+      boolean hasHeader, char sep, boolean useSchemaProvider, String transformerClassName)
throws Exception {
+    prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassName != null);
+    String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
+    String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0";
+    HoodieDeltaStreamer deltaStreamer =
+        new HoodieDeltaStreamer(TestHelpers.makeConfig(
+            tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(),
+            transformerClassName, PROPS_FILENAME_TEST_CSV, false,
+            useSchemaProvider, 1000, false, null, null, sourceOrderingField), jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
+    testNum++;
+  }
+
+  @Test
+  public void testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws Exception
{
+    // The CSV files have header, the columns are separated by ',', the default separator
+    // No schema provider is specified, no transformer is applied
+    // In this case, the source schema comes from the inferred schema of the CSV files
+    testCsvDFSSource(true, ',', false, null);
+  }
+
+  @Test
+  public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws
Exception {
+    // The CSV files have header, the columns are separated by '\t',
+    // which is passed in through the Hudi CSV properties
+    // No schema provider is specified, no transformer is applied
+    // In this case, the source schema comes from the inferred schema of the CSV files
+    testCsvDFSSource(true, '\t', false, null);
+  }
+
+  @Test
+  public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws
Exception {
+    // The CSV files have header, the columns are separated by '\t'
+    // File schema provider is used, no transformer is applied
+    // In this case, the source schema comes from the source Avro schema file
+    testCsvDFSSource(true, '\t', true, null);
+  }
+
+  @Test
+  public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer() throws
Exception {
+    // The CSV files have header, the columns are separated by '\t'
+    // No schema provider is specified, transformer is applied
+    // In this case, the source schema comes from the inferred schema of the CSV files.
+    // Target schema is determined based on the Dataframe after transformation
+    testCsvDFSSource(true, '\t', false, TripsWithDistanceTransformer.class.getName());
+  }
+
+  @Test
+  public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception
{
+    // The CSV files have header, the columns are separated by '\t'
+    // File schema provider is used, transformer is applied
+    // In this case, the source and target schema come from the Avro schema files
+    testCsvDFSSource(true, '\t', true, TripsWithDistanceTransformer.class.getName());
+  }
+
+  @Test
+  public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndNoTransformer() throws Exception
{
+    // The CSV files do not have header, the columns are separated by '\t',
+    // which is passed in through the Hudi CSV properties
+    // No schema provider is specified, no transformer is applied
+    // In this case, the source schema comes from the inferred schema of the CSV files
+    // No CSV header and no schema provider at the same time are not recommended
+    // as the column names are not informative
+    testCsvDFSSource(false, '\t', false, null);
+  }
+
+  @Test
+  public void testCsvDFSSourceNoHeaderWithSchemaProviderAndNoTransformer() throws Exception
{
+    // The CSV files do not have header, the columns are separated by '\t'
+    // File schema provider is used, no transformer is applied
+    // In this case, the source schema comes from the source Avro schema file
+    testCsvDFSSource(false, '\t', true, null);
+  }
+
+  @Test
+  public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndWithTransformer() throws Exception
{
+    // The CSV files do not have header, the columns are separated by '\t'
+    // No schema provider is specified, transformer is applied
+    // In this case, the source schema comes from the inferred schema of the CSV files.
+    // Target schema is determined based on the Dataframe after transformation
+    // No CSV header and no schema provider at the same time are not recommended,
+    // as the transformer behavior may be unexpected
+    try {
+      testCsvDFSSource(false, '\t', false, TripsWithDistanceTransformer.class.getName());
+      fail("Should error out when doing the transformation.");
+    } catch (AnalysisException e) {
+      LOG.error("Expected error during transformation", e);
+      assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:"));
+    }
+  }
+
+  @Test
+  public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception
{
+    // The CSV files do not have header, the columns are separated by '\t'
+    // File schema provider is used, transformer is applied
+    // In this case, the source and target schema come from the Avro schema files
+    testCsvDFSSource(false, '\t', true, TripsWithDistanceTransformer.class.getName());
+  }
+
   /**
    * UDF to calculate Haversine distance.
    */
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
index 1fcd99a..abf6578 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
@@ -27,11 +27,20 @@ import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveClient;
 import org.apache.hudi.hive.util.HiveTestService;
 import org.apache.hudi.utilities.sources.TestDataSource;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
 import com.google.common.collect.ImmutableList;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
@@ -42,6 +51,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.service.server.HiveServer2;
 import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
@@ -56,6 +66,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -72,6 +83,7 @@ public class UtilitiesTestBase {
   protected transient SparkSession sparkSession = null;
   protected transient SQLContext sqlContext;
   protected static HiveServer2 hiveServer;
+  private static ObjectMapper mapper = new ObjectMapper();
 
   @BeforeClass
   public static void initClass() throws Exception {
@@ -193,9 +205,47 @@ public class UtilitiesTestBase {
       os.close();
     }
 
+    /**
+     * Converts the json records into CSV format and writes to a file.
+     *
+     * @param hasHeader  whether the CSV file should have a header line.
+     * @param sep  the column separator to use.
+     * @param lines  the records in JSON format.
+     * @param fs  {@link FileSystem} instance.
+     * @param targetPath  File path.
+     * @throws IOException
+     */
+    public static void saveCsvToDFS(
+        boolean hasHeader, char sep,
+        String[] lines, FileSystem fs, String targetPath) throws IOException {
+      Builder csvSchemaBuilder = CsvSchema.builder();
+
+      ArrayNode arrayNode = mapper.createArrayNode();
+      Arrays.stream(lines).forEachOrdered(
+          line -> {
+            try {
+              arrayNode.add(mapper.readValue(line, ObjectNode.class));
+            } catch (IOException e) {
+              throw new HoodieIOException(
+                  "Error converting json records into CSV format: " + e.getMessage());
+            }
+          });
+      arrayNode.get(0).fieldNames().forEachRemaining(csvSchemaBuilder::addColumn);
+      ObjectWriter csvObjWriter = new CsvMapper()
+          .writerFor(JsonNode.class)
+          .with(csvSchemaBuilder.setUseHeader(hasHeader).setColumnSeparator(sep).build());
+      PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
+      csvObjWriter.writeValue(os, arrayNode);
+      os.flush();
+      os.close();
+    }
+
     public static void saveParquetToDFS(List<GenericRecord> records, Path targetFile)
throws IOException {
       try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(targetFile)
-          .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build())
{
+          .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA)
+          .withConf(HoodieTestUtils.getDefaultHadoopConf())
+          .withWriteMode(Mode.OVERWRITE)
+          .build()) {
         for (GenericRecord record : records) {
           writer.write(record);
         }
@@ -203,9 +253,13 @@ public class UtilitiesTestBase {
     }
 
     public static TypedProperties setupSchemaOnDFS() throws IOException {
-      UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath
+ "/source.avsc");
+      return setupSchemaOnDFS("source.avsc");
+    }
+
+    public static TypedProperties setupSchemaOnDFS(String filename) throws IOException {
+      UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/" + filename, dfs, dfsBasePath
+ "/" + filename);
       TypedProperties props = new TypedProperties();
-      props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath
+ "/source.avsc");
+      props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath
+ "/" + filename);
       return props;
     }
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
index c8dc5d5..175edde 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
@@ -123,7 +123,7 @@ public abstract class AbstractBaseTestSource extends AvroSource {
       updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
           .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
     }
-    Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime,
numInserts)
+    Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime,
numInserts, false)
         .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
     return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream));
   }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java
index 5815317..42cbebc 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java
@@ -56,6 +56,7 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase
{
   String dfsRoot;
   String fileSuffix;
   HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+  boolean useFlattenedSchema = false;
 
   @BeforeClass
   public static void initClass() throws Exception {
@@ -105,7 +106,7 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase
{
    */
   Path generateOneFile(String filename, String commitTime, int n) throws IOException {
     Path path = new Path(dfsRoot, filename + fileSuffix);
-    writeNewDataToFile(dataGenerator.generateInserts(commitTime, n), path);
+    writeNewDataToFile(dataGenerator.generateInserts(commitTime, n, useFlattenedSchema),
path);
     return path;
   }
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
new file mode 100644
index 0000000..fbb6d8f
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.utilities.UtilitiesTestBase;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Basic tests for {@link CsvDFSSource}.
+ */
+public class TestCsvDFSSource extends AbstractDFSSourceTestBase {
+
+  @Before
+  public void setup() throws Exception {
+    super.setup();
+    this.dfsRoot = dfsBasePath + "/jsonFiles";
+    this.fileSuffix = ".json";
+    this.useFlattenedSchema = true;
+    this.schemaProvider = new FilebasedSchemaProvider(
+        Helpers.setupSchemaOnDFS("source-flattened.avsc"), jsc);
+  }
+
+  @Override
+  Source prepareDFSSource() {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+    props.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(true));
+    props.setProperty("hoodie.deltastreamer.csv.sep", "\t");
+    return new CsvDFSSource(props, jsc, sparkSession, schemaProvider);
+  }
+
+  @Override
+  void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException
{
+    UtilitiesTestBase.Helpers.saveCsvToDFS(
+        true, '\t', Helpers.jsonifyRecords(records), dfs, path.toString());
+  }
+}
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source-flattened.avsc
b/hudi-utilities/src/test/resources/delta-streamer-config/source-flattened.avsc
new file mode 100644
index 0000000..ed3a7be
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/source-flattened.avsc
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type" : "record",
+  "name" : "triprec",
+  "fields" : [
+  {
+    "name" : "timestamp",
+    "type" : "double"
+  }, {
+    "name" : "_row_key",
+    "type" : "string"
+  }, {
+    "name" : "rider",
+    "type" : "string"
+  }, {
+    "name" : "driver",
+    "type" : "string"
+  }, {
+    "name" : "begin_lat",
+    "type" : "double"
+  }, {
+    "name" : "begin_lon",
+    "type" : "double"
+  }, {
+    "name" : "end_lat",
+    "type" : "double"
+  }, {
+    "name" : "end_lon",
+    "type" : "double"
+  }, {
+    "name" : "fare",
+    "type" : "double"
+  }, {
+    "name" : "currency",
+    "type" : "string"
+  }, {
+    "name" : "_hoodie_is_deleted",
+    "type" : "boolean",
+    "default" : false
+  } ]
+}
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target-flattened.avsc
b/hudi-utilities/src/test/resources/delta-streamer-config/target-flattened.avsc
new file mode 100644
index 0000000..4e9e4af
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/target-flattened.avsc
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type" : "record",
+  "name" : "triprec",
+  "fields" : [
+  {
+    "name" : "timestamp",
+    "type" : "double"
+  }, {
+    "name" : "_row_key",
+    "type" : "string"
+  }, {
+    "name" : "rider",
+    "type" : "string"
+  }, {
+    "name" : "driver",
+    "type" : "string"
+  }, {
+    "name" : "begin_lat",
+    "type" : "double"
+  }, {
+    "name" : "begin_lon",
+    "type" : "double"
+  }, {
+    "name" : "end_lat",
+    "type" : "double"
+  }, {
+    "name" : "end_lon",
+    "type" : "double"
+  }, {
+    "name" : "fare",
+    "type" : "double"
+  }, {
+    "name" : "currency",
+    "type" : "string"
+  }, {
+    "name" : "_hoodie_is_deleted",
+    "type" : "boolean",
+    "default" : false
+  }, {
+    "name" : "haversine_distance",
+    "type" : "double"
+  }]
+}


Mime
View raw message