gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-248] Converter for Json to Parquet
Date Wed, 18 Oct 2017 05:32:33 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 626d312a2 -> f0582115b


[GOBBLIN-248] Converter for Json to Parquet

Closes #2101 from tilakpatidar/parquet


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f0582115
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f0582115
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f0582115

Branch: refs/heads/master
Commit: f0582115ba3d655febec9af9e395ed858efcd335
Parents: 626d312
Author: tilakpatidar <tilakpatidar@gmail.com>
Authored: Wed Oct 18 11:02:17 2017 +0530
Committer: Abhishek Tiwari <abhishektiwari.btech@gmail.com>
Committed: Wed Oct 18 11:02:17 2017 +0530

----------------------------------------------------------------------
 gobblin-modules/gobblin-parquet/build.gradle    |   1 +
 .../parquet/JsonElementConversionFactory.java   | 485 +++++++++++++++++++
 ...JsonIntermediateToParquetGroupConverter.java |  57 +++
 .../gobblin/converter/parquet/JsonSchema.java   | 162 +++++++
 .../gobblin/converter/parquet/ParquetGroup.java | 233 +++++++++
 ...IntermediateToParquetGroupConverterTest.java | 128 +++++
 .../JsonIntermediateToParquetConverter.json     | 223 +++++++++
 7 files changed, 1289 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/build.gradle b/gobblin-modules/gobblin-parquet/build.gradle
index e43f543..75530b9 100644
--- a/gobblin-modules/gobblin-parquet/build.gradle
+++ b/gobblin-modules/gobblin-parquet/build.gradle
@@ -20,6 +20,7 @@ apply plugin: 'java'
 dependencies {
   compile project(":gobblin-core")
 
+  compile externalDependency.gson
   compile externalDependency.parquet
 
   testCompile externalDependency.testng

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
new file mode 100644
index 0000000..1d3636a
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.parquet;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.converter.parquet.JsonSchema.*;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import parquet.example.data.Group;
+import parquet.example.data.simple.BinaryValue;
+import parquet.example.data.simple.BooleanValue;
+import parquet.example.data.simple.DoubleValue;
+import parquet.example.data.simple.FloatValue;
+import parquet.example.data.simple.IntegerValue;
+import parquet.example.data.simple.LongValue;
+import parquet.io.api.Binary;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Type;
+import parquet.schema.Types;
+
+import static org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.CHILD;
+import static org.apache.gobblin.converter.parquet.JsonSchema.*;
+import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.STRING;
+import static parquet.schema.OriginalType.UTF8;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static parquet.schema.Type.Repetition.OPTIONAL;
+import static parquet.schema.Type.Repetition.REPEATED;
+
+
+/**
+ * <p>
+ * Creates a JsonElement to Parquet converter for all supported data types.
+ * </p>
+ *
+ * @author tilakpatidar
+ *
+ */
+public class JsonElementConversionFactory {
+
+  /**
+   * Use to create a converter for a single field from a parquetSchema.
+   *
+   * @param schema
+   * @param repeated - Is the {@link Type} repeated in the parent {@link Group}
+   * @return
+   */
+  public static JsonElementConverter getConverter(JsonSchema schema, boolean repeated) {
+
+    InputType fieldType = schema.getInputType();
+    switch (fieldType) {
+      case INT:
+        return new IntConverter(schema, repeated);
+
+      case LONG:
+        return new LongConverter(schema, repeated);
+
+      case FLOAT:
+        return new FloatConverter(schema, repeated);
+
+      case DOUBLE:
+        return new DoubleConverter(schema, repeated);
+
+      case BOOLEAN:
+        return new BooleanConverter(schema, repeated);
+
+      case STRING:
+        return new StringConverter(schema, repeated);
+
+      case ARRAY:
+        return new ArrayConverter(schema);
+
+      case ENUM:
+        return new EnumConverter(schema);
+
+      case RECORD:
+        return new RecordConverter(schema);
+
+      case MAP:
+        return new MapConverter(schema);
+
+      default:
+        throw new UnsupportedOperationException(fieldType + " is unsupported");
+    }
+  }
+
+  /**
+   * Converts a JsonElement into a supported ParquetType
+   * @author tilakpatidar
+   *
+   */
+  public static abstract class JsonElementConverter {
+    protected final JsonSchema jsonSchema;
+
+    protected JsonElementConverter(JsonSchema schema) {
+      this.jsonSchema = schema;
+    }
+
+    /**
+     * Convert value to a parquet type and perform null check.
+     * @param value
+     * @return Parquet safe type
+     */
+    public Object convert(JsonElement value) {
+      if (value.isJsonNull()) {
+        if (this.jsonSchema.isNullable()) {
+          return null;
+        }
+        throw new RuntimeException(
+            "Field: " + this.jsonSchema.getColumnName() + " is not nullable and contains
a null value");
+      }
+      return convertField(value);
+    }
+
+    /**
+     * Returns a {@link Type} parquet schema
+     * @return
+     */
+    abstract public Type schema();
+
+    /**
+     * Convert JsonElement to Parquet type
+     * @param value
+     * @return
+     */
+    abstract Object convertField(JsonElement value);
+  }
+
+  /**
+   * Converts a {@link JsonSchema} to a {@link PrimitiveType}
+   */
+  public static abstract class PrimitiveConverter extends JsonElementConverter {
+    protected final boolean repeated;
+    private PrimitiveTypeName outputType;
+    protected Type schema;
+
+    /**
+     * @param jsonSchema
+     * @param repeated
+     * @param outputType
+     */
+    public PrimitiveConverter(JsonSchema jsonSchema, boolean repeated, PrimitiveTypeName
outputType) {
+      super(jsonSchema);
+      this.repeated = repeated;
+      this.outputType = outputType;
+      this.schema = buildSchema();
+    }
+
+    protected Type buildSchema() {
+      return new PrimitiveType(this.repeated ? REPEATED : this.jsonSchema.optionalOrRequired(),
this.outputType,
+          this.jsonSchema.getColumnName());
+    }
+
+    @Override
+    public Type schema() {
+      return this.schema;
+    }
+  }
+
+  /**
+   * Converts {@link JsonSchema} having collection of elements of {@link InputType} into
a {@link GroupType}.
+   */
+  public static abstract class CollectionConverter extends JsonElementConverter {
+    protected InputType elementType;
+    protected JsonElementConverter elementConverter;
+    protected Type schema;
+
+    public CollectionConverter(JsonSchema collectionSchema, InputType elementType, boolean
repeated) {
+      super(collectionSchema);
+      this.elementType = elementType;
+      this.elementConverter = getConverter(getElementSchema(), repeated);
+      this.schema = buildSchema();
+    }
+
+    @Override
+    public Type schema() {
+      return this.schema;
+    }
+
+    /**
+     * Prepare a {@link JsonSchema} for the elements in a collection.
+     * @return
+     */
+    abstract JsonSchema getElementSchema();
+
+    abstract Type buildSchema();
+  }
+
+  public static class IntConverter extends PrimitiveConverter {
+
+    public IntConverter(JsonSchema schema, boolean repeated) {
+      super(schema, repeated, INT32);
+    }
+
+    @Override
+    IntegerValue convertField(JsonElement value) {
+      return new IntegerValue(value.getAsInt());
+    }
+  }
+
+  public static class LongConverter extends PrimitiveConverter {
+
+    public LongConverter(JsonSchema schema, boolean repeated) {
+      super(schema, repeated, INT64);
+    }
+
+    @Override
+    LongValue convertField(JsonElement value) {
+      return new LongValue(value.getAsLong());
+    }
+  }
+
+  public static class FloatConverter extends PrimitiveConverter {
+
+    public FloatConverter(JsonSchema schema, boolean repeated) {
+      super(schema, repeated, PrimitiveTypeName.FLOAT);
+    }
+
+    @Override
+    FloatValue convertField(JsonElement value) {
+      return new FloatValue(value.getAsFloat());
+    }
+  }
+
+  public static class DoubleConverter extends PrimitiveConverter {
+
+    public DoubleConverter(JsonSchema schema, boolean repeated) {
+      super(schema, repeated, PrimitiveTypeName.DOUBLE);
+    }
+
+    @Override
+    DoubleValue convertField(JsonElement value) {
+      return new DoubleValue(value.getAsDouble());
+    }
+  }
+
+  public static class BooleanConverter extends PrimitiveConverter {
+
+    public BooleanConverter(JsonSchema schema, boolean repeated) {
+      super(schema, repeated, PrimitiveTypeName.BOOLEAN);
+    }
+
+    @Override
+    BooleanValue convertField(JsonElement value) {
+      return new BooleanValue(value.getAsBoolean());
+    }
+  }
+
+  public static class StringConverter extends PrimitiveConverter {
+
+    public StringConverter(JsonSchema schema, boolean repeated) {
+      super(schema, repeated, BINARY);
+      this.schema = buildSchema();
+    }
+
+    @Override
+    BinaryValue convertField(JsonElement value) {
+      return new BinaryValue(Binary.fromString(value.getAsString()));
+    }
+
+    @Override
+    protected Type buildSchema() {
+      String columnName = this.jsonSchema.getColumnName();
+      if (this.repeated) {
+        return Types.repeated(BINARY).as(UTF8).named(columnName);
+      }
+      switch (this.jsonSchema.optionalOrRequired()) {
+        case OPTIONAL:
+          return Types.optional(BINARY).as(UTF8).named(columnName);
+        case REQUIRED:
+          return Types.required(BINARY).as(UTF8).named(columnName);
+        default:
+          throw new RuntimeException("Unsupported Repetition type");
+      }
+    }
+  }
+
+  public static class ArrayConverter extends CollectionConverter {
+
+    public ArrayConverter(JsonSchema arraySchema) {
+      super(arraySchema, arraySchema.getElementTypeUsingKey(ARRAY_ITEMS_KEY), true);
+    }
+
+    @Override
+    Object convertField(JsonElement value) {
+      ParquetGroup array = new ParquetGroup((GroupType) schema());
+      JsonElementConverter converter = this.elementConverter;
+      for (JsonElement elem : (JsonArray) value) {
+        array.add(ARRAY_KEY, converter.convert(elem));
+      }
+      return array;
+    }
+
+    @Override
+    protected Type buildSchema() {
+      List<Type> fields = new ArrayList<>();
+      fields.add(0, this.elementConverter.schema());
+      return new GroupType(this.jsonSchema.optionalOrRequired(), this.jsonSchema.getColumnName(),
fields);
+    }
+
+    @Override
+    JsonSchema getElementSchema() {
+      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType);
+      jsonSchema.setColumnName(ARRAY_KEY);
+      return jsonSchema;
+    }
+  }
+
+  public static class EnumConverter extends CollectionConverter {
+    private final HashSet<String> symbols = new HashSet<>();
+
+    public EnumConverter(JsonSchema enumSchema) {
+      super(enumSchema, STRING, false);
+      JsonArray symbolsArray = enumSchema.getSymbols();
+      symbolsArray.forEach(e -> symbols.add(e.getAsString()));
+    }
+
+    @Override
+    Object convertField(JsonElement value) {
+      if (symbols.contains(value.getAsString()) || this.jsonSchema.isNullable()) {
+        return this.elementConverter.convert(value);
+      }
+      throw new RuntimeException("Symbol " + value.getAsString() + " does not belong to set
" + symbols.toString());
+    }
+
+    @Override
+    protected Type buildSchema() {
+      return this.elementConverter.schema();
+    }
+
+    @Override
+    JsonSchema getElementSchema() {
+      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING);
+      jsonSchema.setColumnName(this.jsonSchema.getColumnName());
+      return jsonSchema;
+    }
+  }
+
+  public static class RecordConverter extends JsonElementConverter {
+
+    private final HashMap<String, JsonElementConverter> converters;
+    private final RecordType recordType;
+    private final Type schema;
+
+    public enum RecordType {
+      ROOT, CHILD
+    }
+
+    public RecordConverter(JsonSchema recordSchema) {
+      this(recordSchema, CHILD);
+    }
+
+    public RecordConverter(JsonSchema recordSchema, RecordType recordType) {
+      super(recordSchema);
+      this.converters = new HashMap<>();
+      this.recordType = recordType;
+      this.schema = buildSchema();
+    }
+
+    @Override
+    Object convertField(JsonElement value) {
+      ParquetGroup r1 = new ParquetGroup((GroupType) schema());
+      JsonObject inputRecord = value.getAsJsonObject();
+      for (Map.Entry<String, JsonElement> entry : inputRecord.entrySet()) {
+        String key = entry.getKey();
+        JsonElementConverter converter = this.converters.get(key);
+        Object convertedValue = converter.convert(entry.getValue());
+        boolean valueIsNull = convertedValue == null;
+        Type.Repetition repetition = converter.jsonSchema.optionalOrRequired();
+        if (valueIsNull && repetition.equals(OPTIONAL)) {
+          continue;
+        }
+        r1.add(key, convertedValue);
+      }
+      return r1;
+    }
+
+    private Type buildSchema() {
+      JsonArray inputSchema = this.jsonSchema.getDataTypeValues();
+      List<Type> parquetTypes = new ArrayList<>();
+      for (JsonElement element : inputSchema) {
+        JsonObject map = (JsonObject) element;
+        JsonSchema elementSchema = new JsonSchema(map);
+        String columnName = elementSchema.getColumnName();
+        JsonElementConverter converter = JsonElementConversionFactory.getConverter(elementSchema,
false);
+        Type schemaType = converter.schema();
+        this.converters.put(columnName, converter);
+        parquetTypes.add(schemaType);
+      }
+      String docName = this.jsonSchema.getColumnName();
+      switch (recordType) {
+        case ROOT:
+          return new MessageType(docName, parquetTypes);
+        case CHILD:
+          return new GroupType(this.jsonSchema.optionalOrRequired(), docName, parquetTypes);
+        default:
+          throw new RuntimeException("Unsupported Record type");
+      }
+    }
+
+    @Override
+    public Type schema() {
+      return this.schema;
+    }
+  }
+
+  public static class MapConverter extends CollectionConverter {
+
+    public MapConverter(JsonSchema mapSchema) {
+      super(mapSchema, mapSchema.getElementTypeUsingKey(MAP_ITEMS_KEY), false);
+    }
+
+    @Override
+    Object convertField(JsonElement value) {
+      ParquetGroup mapGroup = new ParquetGroup((GroupType) schema());
+      JsonElementConverter converter = this.elementConverter;
+      JsonObject map = (JsonObject) value;
+
+      for (Map.Entry<String, JsonElement> entry : map.entrySet()) {
+        ParquetGroup entrySet = (ParquetGroup) mapGroup.addGroup(MAP_KEY);
+        entrySet.add(MAP_KEY_COLUMN_NAME, entry.getKey());
+        entrySet.add(MAP_VALUE_COLUMN_NAME, converter.convert(entry.getValue()));
+      }
+
+      return mapGroup;
+    }
+
+    @Override
+    protected Type buildSchema() {
+      JsonElementConverter elementConverter = this.elementConverter;
+      JsonElementConverter keyConverter = getKeyConverter();
+      GroupType mapGroup =
+          Types.repeatedGroup().addFields(keyConverter.schema(), elementConverter.schema()).named(MAP_KEY)
+              .asGroupType();
+      String columnName = this.jsonSchema.getColumnName();
+      switch (this.jsonSchema.optionalOrRequired()) {
+        case OPTIONAL:
+          return Types.optionalGroup().addFields(mapGroup).named(columnName).asGroupType();
+        case REQUIRED:
+          return Types.requiredGroup().addFields(mapGroup).named(columnName).asGroupType();
+        default:
+          return null;
+      }
+    }
+
+    @Override
+    JsonSchema getElementSchema() {
+      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType);
+      jsonSchema.setColumnName(MAP_VALUE_COLUMN_NAME);
+      return jsonSchema;
+    }
+
+    public JsonElementConverter getKeyConverter() {
+      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING);
+      jsonSchema.setColumnName(MAP_KEY_COLUMN_NAME);
+      return getConverter(jsonSchema, false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
new file mode 100644
index 0000000..b04dcf8
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.parquet;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.Converter;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.converter.SingleRecordIterable;
+import org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+
+import parquet.example.data.Group;
+import parquet.schema.MessageType;
+
+import static org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.ROOT;
+
+
+/**
+ * A converter to Convert JsonIntermediate to Parquet
+ * @author tilakpatidar
+ */
+public class JsonIntermediateToParquetGroupConverter extends Converter<JsonArray, MessageType,
JsonObject, Group> {
+  private RecordConverter recordConverter;
+
+  @Override
+  public MessageType convertSchema(JsonArray inputSchema, WorkUnitState workUnit)
+      throws SchemaConversionException {
+    String fieldName = workUnit.getExtract().getTable();
+    JsonSchema jsonSchema = new JsonSchema(inputSchema);
+    jsonSchema.setColumnName(fieldName);
+    recordConverter = new RecordConverter(jsonSchema, ROOT);
+    return (MessageType) recordConverter.schema();
+  }
+
+  @Override
+  public Iterable<Group> convertRecord(MessageType outputSchema, JsonObject inputRecord,
WorkUnitState workUnit)
+      throws DataConversionException {
+    return new SingleRecordIterable<>((Group) recordConverter.convert(inputRecord));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
new file mode 100644
index 0000000..b7e001b
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.parquet;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.schema.Schema;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+
+import parquet.schema.Type.Repetition;
+
+import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.ENUM;
+import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.RECORD;
+import static parquet.schema.Type.Repetition.OPTIONAL;
+import static parquet.schema.Type.Repetition.REQUIRED;
+
+
+/**
+ * Represents a source schema declared in the configuration with {@link ConfigurationKeys#SOURCE_SCHEMA}.
+ * The source schema is represented by a {@link JsonArray}.
+ * @author tilakpatidar
+ */
+public class JsonSchema extends Schema {
+  public static final String RECORD_FIELDS_KEY = "values";
+  public static final String TYPE_KEY = "type";
+  public static final String ENUM_SYMBOLS_KEY = "symbols";
+  public static final String COLUMN_NAME_KEY = "columnName";
+  public static final String DATA_TYPE_KEY = "dataType";
+  public static final String COMMENT_KEY = "comment";
+  public static final String DEFAULT_VALUE_KEY = "defaultValue";
+  public static final String IS_NULLABLE_KEY = "isNullable";
+  public static final String DEFAULT_RECORD_COLUMN_NAME = "temp";
+  public static final String DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY = "";
+  public static final String ARRAY_KEY = "item";
+  public static final String ARRAY_ITEMS_KEY = "items";
+  public static final String MAP_ITEMS_KEY = "values";
+  public static final String MAP_KEY = "map";
+  public static final String MAP_KEY_COLUMN_NAME = "key";
+  public static final String MAP_VALUE_COLUMN_NAME = "value";
+  private final InputType type;
+
+  public enum InputType {
+    STRING, INT, LONG, FLOAT, DOUBLE, BOOLEAN, ARRAY, ENUM, RECORD, MAP
+  }
+
+  public JsonSchema(JsonArray jsonArray) {
+    JsonObject jsonObject = new JsonObject();
+    JsonObject dataType = new JsonObject();
+    jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
+    dataType.addProperty(TYPE_KEY, RECORD.toString());
+    dataType.add(RECORD_FIELDS_KEY, jsonArray);
+    jsonObject.add(DATA_TYPE_KEY, dataType);
+    setJsonSchemaProperties(jsonObject);
+    this.type = RECORD;
+  }
+
+  public JsonSchema(JsonObject jsonobject) {
+    setJsonSchemaProperties(jsonobject);
+    this.type = InputType.valueOf(getDataType().get(TYPE_KEY).getAsString().toUpperCase());
+  }
+
+  /**
+   * Get source.schema within a {@link InputType#RECORD} type.
+   * The source.schema is represented by a {@link JsonArray}
+   * @return
+   */
+  public JsonArray getDataTypeValues() {
+    if (this.type.equals(RECORD)) {
+      return getDataType().get(RECORD_FIELDS_KEY).getAsJsonArray();
+    }
+    return new JsonArray();
+  }
+
+  /**
+   * Get symbols for a {@link InputType#ENUM} type.
+   * @return
+   */
+  public JsonArray getSymbols() {
+    if (this.type.equals(ENUM)) {
+      return getDataType().get(ENUM_SYMBOLS_KEY).getAsJsonArray();
+    }
+    return new JsonArray();
+  }
+
+  /**
+   * Get {@link InputType} for this {@link JsonSchema}.
+   * @return
+   */
+  public InputType getInputType() {
+    return type;
+  }
+
+  /**
+   * Builds a {@link JsonSchema} object for a given {@link InputType} object.
+   * @param type
+   * @return
+   */
+  public static JsonSchema buildBaseSchema(InputType type) {
+    JsonObject jsonObject = new JsonObject();
+    JsonObject dataType = new JsonObject();
+    jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
+    dataType.addProperty(TYPE_KEY, type.toString());
+    jsonObject.add(DATA_TYPE_KEY, dataType);
+    return new JsonSchema(jsonObject);
+  }
+
+  /**
+   * {@link InputType} of the elements composed within complex type.
+   * @param itemKey
+   * @return
+   */
+  public InputType getElementTypeUsingKey(String itemKey) {
+    String type = this.getDataType().get(itemKey).getAsString().toUpperCase();
+    return InputType.valueOf(type);
+  }
+
+  /**
+   * Parquet {@link Repetition} for this {@link JsonSchema}.
+   * @return
+   */
+  public Repetition optionalOrRequired() {
+    return this.isNullable() ? OPTIONAL : REQUIRED;
+  }
+
+  /**
+   * Set properties for {@link JsonSchema} from a {@link JsonObject}.
+   * @param jsonObject
+   */
+  private void setJsonSchemaProperties(JsonObject jsonObject) {
+    setColumnName(jsonObject.get(COLUMN_NAME_KEY).getAsString());
+    setDataType(jsonObject.get(DATA_TYPE_KEY).getAsJsonObject());
+    setNullable(jsonObject.has(IS_NULLABLE_KEY) && jsonObject.get(IS_NULLABLE_KEY).getAsBoolean());
+    setComment(getOptionalProperty(jsonObject, COMMENT_KEY));
+    setDefaultValue(getOptionalProperty(jsonObject, DEFAULT_VALUE_KEY));
+  }
+
+  /**
+   * Get optional property from a {@link JsonObject} for a {@link String} key.
+   * If key does'nt exists returns {@link #DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY}.
+   * @param jsonObject
+   * @param key
+   * @return
+   */
+  private String getOptionalProperty(JsonObject jsonObject, String key) {
+    return jsonObject.has(key) ? jsonObject.get(key).getAsString() : DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
new file mode 100644
index 0000000..783d845
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.parquet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import parquet.example.data.Group;
+import parquet.example.data.simple.BinaryValue;
+import parquet.example.data.simple.BooleanValue;
+import parquet.example.data.simple.DoubleValue;
+import parquet.example.data.simple.FloatValue;
+import parquet.example.data.simple.Int96Value;
+import parquet.example.data.simple.IntegerValue;
+import parquet.example.data.simple.LongValue;
+import parquet.example.data.simple.NanoTime;
+import parquet.example.data.simple.Primitive;
+import parquet.io.api.Binary;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.GroupType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.Type;
+
+import static parquet.schema.Type.Repetition.REPEATED;
+
+
+/**
+ * Custom Implementation of {@link Group} to support adding {@link Object} of type {@link
Primitive} or {@link Group}.
+ * Also provides methods to add {@link Primitive} and {@link Group} with {@link String} key
if index is not known.
+ * @author tilakpatidar
+ */
+public class ParquetGroup extends Group {
+
+  private final GroupType schema;
+  //each item represents data of a field, which is indexed by the fieldIndex of the schema
+  private final List<Object>[] data;
+
+  public ParquetGroup(GroupType schema) {
+    this.schema = schema;
+    this.data = new List[schema.getFields().size()];
+
+    for (int i = 0; i < schema.getFieldCount(); ++i) {
+      this.data[i] = new ArrayList();
+    }
+  }
+
+  public String toString() {
+    return this.toString("");
+  }
+
+  public String toString(String indent) {
+    StringBuilder result = new StringBuilder();
+    int i = 0;
+    for (Type field : this.schema.getFields()) {
+      String name = field.getName();
+      List<Object> values = this.data[i];
+      for (Object value : values) {
+        result.append(indent).append(name);
+        if (value == null) {
+          result.append(": NULL\n");
+        } else if (value instanceof Group) {
+          result.append("\n").append(((ParquetGroup) value).toString(indent + "  "));
+        } else {
+          result.append(": ").append(value.toString()).append("\n");
+        }
+      }
+      i++;
+    }
+    return result.toString();
+  }
+
+  public Group addGroup(int fieldIndex) {
+    ParquetGroup g = new ParquetGroup(this.schema.getType(fieldIndex).asGroupType());
+    this.data[fieldIndex].add(g);
+    return g;
+  }
+
+  public Group getGroup(int fieldIndex, int index) {
+    return (Group) this.getValue(fieldIndex, index);
+  }
+
+  private Object getValue(int fieldIndex, int index) {
+    List<Object> list;
+    try {
+      list = this.data[fieldIndex];
+    } catch (IndexOutOfBoundsException var6) {
+      throw new RuntimeException(
+          "not found " + fieldIndex + "(" + this.schema.getFieldName(fieldIndex) + ") in
group:\n" + this);
+    }
+
+    try {
+      return list.get(index);
+    } catch (IndexOutOfBoundsException var5) {
+      throw new RuntimeException(
+          "not found " + fieldIndex + "(" + this.schema.getFieldName(fieldIndex) + ") element
number " + index
+              + " in group:\n" + this);
+    }
+  }
+
+  public void add(int fieldIndex, Primitive value) {
+    Type type = this.schema.getType(fieldIndex);
+    List<Object> list = this.data[fieldIndex];
+    if (!type.isRepetition(REPEATED) && !list.isEmpty()) {
+      throw new IllegalStateException(
+          "field " + fieldIndex + " (" + type.getName() + ") can not have more than one value:
" + list);
+    } else {
+      list.add(value);
+    }
+  }
+
+  public int getFieldRepetitionCount(int fieldIndex) {
+    List<Object> list = this.data[fieldIndex];
+    return list == null ? 0 : list.size();
+  }
+
+  public String getValueToString(int fieldIndex, int index) {
+    return String.valueOf(this.getValue(fieldIndex, index));
+  }
+
+  public String getString(int fieldIndex, int index) {
+    return ((BinaryValue) this.getValue(fieldIndex, index)).getString();
+  }
+
+  public int getInteger(int fieldIndex, int index) {
+    return ((IntegerValue) this.getValue(fieldIndex, index)).getInteger();
+  }
+
+  public boolean getBoolean(int fieldIndex, int index) {
+    return ((BooleanValue) this.getValue(fieldIndex, index)).getBoolean();
+  }
+
+  public Binary getBinary(int fieldIndex, int index) {
+    return ((BinaryValue) this.getValue(fieldIndex, index)).getBinary();
+  }
+
+  public Binary getInt96(int fieldIndex, int index) {
+    return ((Int96Value) this.getValue(fieldIndex, index)).getInt96();
+  }
+
+  public void add(int fieldIndex, int value) {
+    this.add(fieldIndex, new IntegerValue(value));
+  }
+
+  public void add(int fieldIndex, long value) {
+    this.add(fieldIndex, new LongValue(value));
+  }
+
+  public void add(int fieldIndex, String value) {
+    this.add(fieldIndex, new BinaryValue(Binary.fromString(value)));
+  }
+
+  public void add(int fieldIndex, NanoTime value) {
+    this.add(fieldIndex, value.toInt96());
+  }
+
+  public void add(int fieldIndex, boolean value) {
+    this.add(fieldIndex, new BooleanValue(value));
+  }
+
+  public void add(int fieldIndex, Binary value) {
+    switch (this.getType().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName())
{
+      case BINARY:
+        this.add(fieldIndex, new BinaryValue(value));
+        break;
+      case INT96:
+        this.add(fieldIndex, new Int96Value(value));
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            this.getType().asPrimitiveType().getName() + " not supported for Binary");
+    }
+  }
+
+  public void add(int fieldIndex, float value) {
+    this.add(fieldIndex, new FloatValue(value));
+  }
+
+  public void add(int fieldIndex, double value) {
+    this.add(fieldIndex, new DoubleValue(value));
+  }
+
+  public GroupType getType() {
+    return this.schema;
+  }
+
+  public void writeValue(int field, int index, RecordConsumer recordConsumer) {
+    ((Primitive) this.getValue(field, index)).writeValue(recordConsumer);
+  }
+
+  /**
+   * Add any object of {@link PrimitiveType} or {@link Group} type with a String key.
+   * @param key
+   * @param object
+   */
+  public void add(String key, Object object) {
+    int fieldIndex = getIndex(key);
+    if (object.getClass() == ParquetGroup.class) {
+      this.addGroup(key, (Group) object);
+    } else {
+      this.add(fieldIndex, (Primitive) object);
+    }
+  }
+
+  private int getIndex(String key) {
+    return getType().getFieldIndex(key);
+  }
+
+  /**
+   * Add a {@link Group} given a String key.
+   * @param key
+   * @param object
+   */
+  private void addGroup(String key, Group object) {
+    int fieldIndex = getIndex(key);
+    this.schema.getType(fieldIndex).asGroupType();
+    this.data[fieldIndex].add(object);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
new file mode 100644
index 0000000..c92834b
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter.parquet;
+
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.source.workunit.Extract;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.reflect.TypeToken;
+
+import parquet.example.data.Group;
+import parquet.schema.MessageType;
+
+import static org.testng.Assert.assertEquals;
+
+
+@Test(groups = {"gobblin.converter"})
+public class JsonIntermediateToParquetGroupConverterTest {
+  private static final String RESOURCE_PATH = "/converter/JsonIntermediateToParquetConverter.json";
+  private static JsonObject testCases;
+  private static WorkUnitState workUnit;
+  private static JsonIntermediateToParquetGroupConverter parquetConverter;
+
+  @BeforeClass
+  public static void setUp() {
+    Type listType = new TypeToken<JsonObject>() {
+    }.getType();
+    Gson gson = new Gson();
+    JsonObject testData = gson.fromJson(
+        new InputStreamReader(JsonIntermediateToParquetGroupConverter.class.getResourceAsStream(RESOURCE_PATH)),
listType);
+
+    testCases = testData.getAsJsonObject();
+    SourceState source = new SourceState();
+    workUnit = new WorkUnitState(
+        source.createWorkUnit(source.createExtract(Extract.TableType.SNAPSHOT_ONLY, "test_namespace",
"test_table")));
+  }
+
+  private void testCase(String testCaseName)
+      throws SchemaConversionException, DataConversionException {
+    JsonObject test = testCases.get(testCaseName).getAsJsonObject();
+    parquetConverter = new JsonIntermediateToParquetGroupConverter();
+
+    MessageType schema = parquetConverter.convertSchema(test.get("schema").getAsJsonArray(),
workUnit);
+    Group record =
+        parquetConverter.convertRecord(schema, test.get("record").getAsJsonObject(), workUnit).iterator().next();
+
+    assertEqualsIgnoreSpaces(schema.toString(), test.get("expectedSchema").getAsString());
+    assertEqualsIgnoreSpaces(record.toString(), test.get("expectedRecord").getAsString());
+  }
+
+  @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Symbol
.* does not belong to set \\[.*?\\]")
+  public void testEnumTypeBelongsToEnumSet()
+      throws Exception {
+    JsonObject test = testCases.get("enum").getAsJsonObject();
+    parquetConverter = new JsonIntermediateToParquetGroupConverter();
+
+    MessageType schema = parquetConverter.convertSchema(test.get("schema").getAsJsonArray(),
workUnit);
+    JsonObject jsonRecord = test.get("record").getAsJsonObject();
+    jsonRecord.addProperty("some_enum", "HELL");
+
+    parquetConverter.convertRecord(schema, jsonRecord, workUnit).iterator().next();
+  }
+
+  @Test
+  public void testPrimitiveTypes()
+      throws Exception {
+    testCase("simplePrimitiveTypes");
+  }
+
+  @Test
+  public void testArrayType()
+      throws Exception {
+    testCase("array");
+  }
+
+  @Test
+  public void testEnumType()
+      throws Exception {
+    testCase("enum");
+  }
+
+  @Test
+  public void testRecordType()
+      throws Exception {
+    testCase("record");
+  }
+
+  @Test
+  public void testMapType()
+      throws Exception {
+    testCase("map");
+  }
+
+  @Test
+  public void testNullValueInOptionalField()
+      throws Exception {
+    testCase("nullValueInOptionalField");
+
+  }
+
+  private void assertEqualsIgnoreSpaces(String actual, String expected) {
+    assertEquals(actual.replaceAll("\\n", ";").replaceAll("\\s|\\t", ""),
+        expected.replaceAll("\\n", ";").replaceAll("\\s|\\t", ""));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json
b/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json
new file mode 100644
index 0000000..e12325d
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json
@@ -0,0 +1,223 @@
+{
+  "simplePrimitiveTypes": {
+    "record": {
+      "a": 5,
+      "b": 5.0,
+      "c": 8.0,
+      "d": true,
+      "e": "somestring"
+    },
+    "schema": [
+      {
+        "columnName": "a",
+        "dataType": {
+          "type": "int"
+        }
+      },
+      {
+        "columnName": "b",
+        "dataType": {
+          "type": "float"
+        }
+      },
+      {
+        "columnName": "c",
+        "dataType": {
+          "type": "double"
+        }
+      },
+      {
+        "columnName": "d",
+        "dataType": {
+          "type": "boolean"
+        }
+      },
+      {
+        "columnName": "e",
+        "dataType": {
+          "type": "string"
+        }
+      }
+    ],
+    "expectedRecord": "a: 5 ; b: 5.0 ; c: 8.0 ; d: true ; e: somestring ; ",
+    "expectedSchema": "message test_table{ ; required int32 a ;  ; required float b ;  ;
required double c ;  ; required boolean d ;  ; required binary e (UTF8) ;  ; } ; "
+  },
+  "array": {
+    "record": {
+      "somearray": [
+        1,
+        2,
+        3
+      ],
+      "somearray1": [
+        1,
+        2,
+        3
+      ],
+      "somearray2": [
+        1.0,
+        2.0,
+        3.0
+      ],
+      "somearray3": [
+        1.0,
+        2.0,
+        3.0
+      ],
+      "somearray4": [
+        true,
+        false,
+        true
+      ],
+      "somearray5": [
+        "hello",
+        "world"
+      ]
+    },
+    "schema": [
+      {
+        "columnName": "somearray",
+        "dataType": {
+          "type": "array",
+          "items": "int"
+        }
+      },
+      {
+        "columnName": "somearray1",
+        "dataType": {
+          "type": "array",
+          "items": "long"
+        }
+      },
+      {
+        "columnName": "somearray2",
+        "dataType": {
+          "type": "array",
+          "items": "float"
+        }
+      },
+      {
+        "columnName": "somearray3",
+        "dataType": {
+          "type": "array",
+          "items": "double"
+        }
+      },
+      {
+        "columnName": "somearray4",
+        "dataType": {
+          "type": "array",
+          "items": "boolean"
+        }
+      },
+      {
+        "columnName": "somearray5",
+        "dataType": {
+          "type": "array",
+          "items": "string"
+        }
+      }
+    ],
+    "expectedRecord": "somearray ;  item:1 ;  item:2 ;  item:3 ; somearray1 ;  item:1 ; 
item:2 ;  item:3 ; somearray2 ;  item:1.0 ;  item:2.0 ;  item:3.0 ; somearray3 ;  item:1.0
;  item:2.0 ;  item:3.0 ; somearray4 ;  item:true ;  item:false ;  item:true ; somearray5
;  item:hello ;  item:world ; ",
+    "expectedSchema": "message test_table {  ;  required group somearray {  ; repeated int32
item ;  ; } ;  required groupsomearray1 {  ; repeated int64 item ;  ; } ;  required groupsomearray2
{  ; repeated float item ;  ; } ;  required groupsomearray3 {  ; repeated double item ;  ;
} ;  required groupsomearray4 {  ; repeated boolean item ;  ; } ;  required groupsomearray5
{  ; repeated binary item(UTF8) ;  ; } ; } ; "
+  },
+  "enum": {
+    "record": {
+      "some_enum": "HELLO"
+    },
+    "schema": [
+      {
+        "columnName": "some_enum",
+        "dataType": {
+          "type": "enum",
+          "symbols": [
+            "HELLO",
+            "WORLD"
+          ]
+        }
+      }
+    ],
+    "expectedRecord": "some_enum : HELLO ;",
+    "expectedSchema": "message test_table { ; required binary some_enum (UTF8) ;; } ;"
+  },
+  "record": {
+    "record": {
+      "some_record": {
+        "name": "me",
+        "age": 22,
+        "some_array": [
+          3,
+          4,
+          5
+        ]
+      }
+    },
+    "schema": [
+      {
+        "columnName": "some_record",
+        "dataType": {
+          "type": "record",
+          "values": [
+            {
+              "columnName": "name",
+              "dataType": {
+                "type": "string"
+              }
+            },
+            {
+              "columnName": "age",
+              "dataType": {
+                "type": "long"
+              }
+            },
+            {
+              "columnName": "some_array",
+              "dataType": {
+                "type": "array",
+                "items": "int"
+              }
+            }
+          ]
+        }
+      }
+    ],
+    "expectedRecord": "some_record ; name:me ; age:22 ; some_array ; item:3 ; item:4 ; item:5
;",
+    "expectedSchema": "message test_table {  ; required group some_record {  ; required binary
name (UTF8) ;  ; required int64 age ;  ; required group some_array {  ; repeated int32 item
; ; } ; } ;  } ; "
+  },
+  "map": {
+    "schema": [
+      {
+        "columnName": "cityToCountry",
+        "dataType": {
+          "type": "map",
+          "values": "string"
+        }
+      }
+    ],
+    "record": {
+      "cityToCountry": {
+        "ny": "US",
+        "london": "UK",
+        "delhi": "India"
+      }
+    },
+    "expectedRecord": "cityToCountry; map; key:ny;value:US; map; key:london;value:UK; map;
key:delhi;value:India;",
+    "expectedSchema": "message test_table {  ;   required groupcityToCountry {  ;  repeated
group map {  ;   required binary key (UTF8) ;   ;   required binary value (UTF8) ;   ;  }
;  } ;  } ;"
+  },
+  "nullValueInOptionalField": {
+    "record": {
+      "a": null
+    },
+    "schema": [
+      {
+        "columnName": "a",
+        "isNullable": true,
+        "dataType": {
+          "type": "int"
+        }
+      }
+    ],
+    "expectedRecord": "",
+    "expectedSchema": "message test_table {; optional int32 a ;; };"
+  }
+}
\ No newline at end of file


Mime
View raw message