gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-238] Implement EnvelopePayloadExtractor and EnvelopePayloadDeserializer
Date Tue, 12 Sep 2017 01:15:27 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 2485282d2 -> ea5047ea2


[GOBBLIN-238] Implement EnvelopePayloadExtractor and EnvelopePayloadDeserializer

Closes #2099 from zxcware/envelope2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ea5047ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ea5047ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ea5047ea

Branch: refs/heads/master
Commit: ea5047ea2665b7bee99352a7efe1cde625e88047
Parents: 2485282
Author: zhchen <zhchen@linkedin.com>
Authored: Mon Sep 11 18:15:21 2017 -0700
Committer: Hung Tran <hutran@linkedin.com>
Committed: Mon Sep 11 18:15:21 2017 -0700

----------------------------------------------------------------------
 .../converter/BaseEnvelopeSchemaConverter.java  | 139 +++++++++++++++++++
 .../converter/EnvelopePayloadConverter.java     |  97 +++++++++++++
 .../EnvelopePayloadExtractingConverter.java     |  48 +++++++
 .../converter/EnvelopeSchemaConverter.java      |   3 +
 .../converter/EnvelopePayloadConverterTest.java | 103 ++++++++++++++
 .../EnvelopePayloadExtractingConverterTest.java | 103 ++++++++++++++
 .../converter/EnvelopeSchemaConverterTest.java  |   3 +
 .../KafkaAvroSchemaRegistryForTest.java         |   3 +
 .../src/test/resources/converter/envelope.avro  | Bin 0 -> 658 bytes
 .../src/test/resources/converter/envelope.avsc  |  46 ++++++
 .../src/test/resources/converter/record.avsc    |  17 +++
 11 files changed, 562 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java
new file mode 100644
index 0000000..d220902
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistryFactory;
+import org.apache.gobblin.util.AvroUtils;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Base class for an envelope schema converter using {@link KafkaSchemaRegistry}
+ */
+public abstract class BaseEnvelopeSchemaConverter<P> extends Converter<Schema, Schema,
GenericRecord, GenericRecord> {
+  public static final String PAYLOAD_SCHEMA_ID_FIELD = "converter.envelopeSchemaConverter.schemaIdField";
+  public static final String PAYLOAD_FIELD = "converter.envelopeSchemaConverter.payloadField";
+  public static final String PAYLOAD_SCHEMA_TOPIC = "converter.envelopeSchemaConverter.payloadSchemaTopic";
+  public static final String KAFKA_REGISTRY_FACTORY = "converter.envelopeSchemaConverter.kafkaRegistryFactory";
+
+  public static final String DEFAULT_PAYLOAD_FIELD = "payload";
+  public static final String DEFAULT_PAYLOAD_SCHEMA_ID_FIELD = "payloadSchemaId";
+  public static final String DEFAULT_KAFKA_SCHEMA_REGISTRY_FACTORY_CLASS =
+      "org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory";
+
+  protected String payloadSchemaIdField;
+  protected String payloadField;
+  protected String payloadSchemaTopic;
+  protected GenericDatumReader<P> latestPayloadReader;
+  protected KafkaSchemaRegistry registry;
+
+  @Override
+  public BaseEnvelopeSchemaConverter init(WorkUnitState workUnit) {
+    super.init(workUnit);
+
+    payloadSchemaIdField = workUnit.getProp(PAYLOAD_SCHEMA_ID_FIELD, DEFAULT_PAYLOAD_SCHEMA_ID_FIELD);
+    payloadField = workUnit.getProp(PAYLOAD_FIELD, DEFAULT_PAYLOAD_FIELD);
+
+    // Get the schema specific topic to fetch the schema in the registry
+    if (!workUnit.contains(PAYLOAD_SCHEMA_TOPIC)) {
+      throw new RuntimeException("Configuration not found: " + PAYLOAD_SCHEMA_TOPIC);
+    }
+    payloadSchemaTopic = workUnit.getProp(PAYLOAD_SCHEMA_TOPIC);
+
+    String registryFactoryField = workUnit.getProp(KAFKA_REGISTRY_FACTORY, DEFAULT_KAFKA_SCHEMA_REGISTRY_FACTORY_CLASS);
+    try {
+      KafkaSchemaRegistryFactory registryFactory =
+          ((Class<? extends KafkaSchemaRegistryFactory>) Class.forName(registryFactoryField)).newInstance();
+      registry = registryFactory.create(workUnit.getProperties());
+    } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e)
{
+      throw new RuntimeException(e);
+    }
+    return this;
+  }
+
+  /**
+   * Get the payload schema
+   *
+   * @param inputRecord the input record which has the payload
+   * @return the current schema of the payload
+   */
+  protected Schema getPayloadSchema(GenericRecord inputRecord)
+      throws Exception {
+    Optional<Object> schemaIdValue = AvroUtils.getFieldValue(inputRecord, payloadSchemaIdField);
+    if (!schemaIdValue.isPresent()) {
+      throw new Exception("Schema id with key " + payloadSchemaIdField + " not found in the
record");
+    }
+    String schemaKey = String.valueOf(schemaIdValue.get());
+    return (Schema) registry.getSchemaByKey(schemaKey);
+  }
+
+  /**
+   * Get payload field and convert to byte array
+   *
+   * @param inputRecord the input record which has the payload
+   * @return the byte array of the payload in the input record
+   */
+  protected byte[] getPayloadBytes(GenericRecord inputRecord) {
+    ByteBuffer bb = (ByteBuffer) inputRecord.get(payloadField);
+    if (bb.hasArray()) {
+      return bb.array();
+    } else {
+      byte[] payloadBytes = new byte[bb.remaining()];
+      bb.get(payloadBytes);
+      return payloadBytes;
+    }
+  }
+
+  protected Schema fetchLatestPayloadSchema() throws Exception {
+    Schema latestPayloadSchema = (Schema)registry.getLatestSchemaByTopic(payloadSchemaTopic);
+    latestPayloadReader = new GenericDatumReader<>(latestPayloadSchema);
+    return latestPayloadSchema;
+  }
+
+  /**
+   * Convert the payload in the input record to a deserialized object with the latest schema
+   *
+   * @param inputRecord the input record
+   * @return the schema'ed payload object
+   */
+  protected P upConvertPayload(GenericRecord inputRecord) throws DataConversionException
{
+    try {
+      Schema payloadSchema = getPayloadSchema(inputRecord);
+      // Set writer schema
+      latestPayloadReader.setSchema(payloadSchema);
+
+      byte[] payloadBytes = getPayloadBytes(inputRecord);
+      Decoder decoder = DecoderFactory.get().binaryDecoder(payloadBytes, null);
+
+      // 'latestPayloadReader.read' will convert the record from 'payloadSchema' to the latest
payload schema
+      return latestPayloadReader.read(null, decoder);
+    } catch (Exception e) {
+      throw new DataConversionException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
new file mode 100644
index 0000000..ca63ac8
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.WorkUnitState;
+
+
+/**
+ * A converter decorates the envelope record with its payload deserialized into schema'ed
object
+ *
+ * <p> Given an envelope schema as the input schema, the output schema will have the
payload
+ * field, configured by key {@value PAYLOAD_FIELD}, set with its latest schema fetched from
a
+ * {@link #registry} (see {@code createDecoratedField(Field)}). The converter copies the
other fields
+ * from the input schema to the output schema
+ *
+ * <p> Given an envelope record as the input record, the output record will have the
payload set
+ * to its deserialized object using the latest schema (see {@code convertPayload(GenericRecord)}).
+ * The converter copies the other fields from the input record to the output record
+ *
+ * <p> If the current payload schema is incompatible with its latest schema, {@code
convertPayload(GenericRecord)}
+ * will throw an exception and the job fail
+ */
+
+public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<GenericRecord>
{
+  public static final String DECORATED_PAYLOAD_DOC = "Decorated payload data";
+
+  @Override
+  public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit)
+      throws SchemaConversionException {
+    List<Field> outputSchemaFields = new ArrayList<>();
+    for (Field field : inputSchema.getFields()) {
+      if (field.name().equals(payloadField)) {
+        // Decorate the field with full schema
+        outputSchemaFields.add(createDecoratedField(field));
+      } else {
+        // Make a copy of the field to the output schema
+        outputSchemaFields.add(new Field(field.name(), field.schema(), field.doc(), field.defaultValue(),
field.order()));
+      }
+    }
+
+    Schema outputSchema = Schema
+        .createRecord(inputSchema.getName(), inputSchema.getDoc(), inputSchema.getNamespace(),
inputSchema.isError());
+    outputSchema.setFields(outputSchemaFields);
+    return outputSchema;
+  }
+
+  /**
+   * Create a payload field with its latest schema fetched from {@link #registry}
+   *
+   * @param field the original payload field from input envelope schema
+   * @return a new payload field with its latest schema
+   */
+  private Field createDecoratedField(Field field) throws SchemaConversionException {
+    try {
+      Schema payloadSchema = fetchLatestPayloadSchema();
+      return new Field(field.name(), payloadSchema, DECORATED_PAYLOAD_DOC, field.defaultValue(),
field.order());
+    } catch (Exception e) {
+      throw new SchemaConversionException(e);
+    }
+  }
+
+  @Override
+  public Iterable<GenericRecord> convertRecord(Schema outputSchema, GenericRecord inputRecord,
WorkUnitState workUnit)
+      throws DataConversionException {
+    GenericRecord outputRecord = new GenericData.Record(outputSchema);
+    for (Field field : inputRecord.getSchema().getFields()) {
+      if (field.name().equals(payloadField)) {
+        outputRecord.put(payloadField, upConvertPayload(inputRecord));
+      } else {
+        outputRecord.put(field.name(), inputRecord.get(field.name()));
+      }
+    }
+    return new SingleRecordIterable<>(outputRecord);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverter.java
new file mode 100644
index 0000000..b759dfd
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * A converter for extracting schema/records from an envelope schema.
+ * Input schema: envelope schema - must have fields payloadSchemaId (the schema registry
key of the output
+ *               schema) and payload (byte data for output record)
+ * Input record: record corresponding to input schema
+ * Output schema: latest schema obtained from schema registry with topic {@link #PAYLOAD_SCHEMA_TOPIC}
+ * Output record: record corresponding to output schema obtained from input record's {@link
#PAYLOAD_FIELD} as bytes
+ */
+public class EnvelopePayloadExtractingConverter extends BaseEnvelopeSchemaConverter<GenericRecord>
{
+  @Override
+  public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit) throws SchemaConversionException
{
+    try {
+      return fetchLatestPayloadSchema();
+    } catch (Exception e) {
+      throw new SchemaConversionException(e);
+    }
+  }
+
+  @Override
+  public Iterable<GenericRecord> convertRecord(Schema outputSchema, GenericRecord inputRecord,
WorkUnitState workUnit)
+      throws DataConversionException {
+    return new SingleRecordIterable<>(upConvertPayload(inputRecord));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java
index 8696f13..124fe4c 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java
@@ -47,7 +47,10 @@ import org.apache.avro.io.DecoderFactory;
  * Input record: record corresponding to input schema
  * Output schema: schema obtained from schema registry using key provided in input record's
{@link #PAYLOAD_SCHEMA_ID_FIELD}
  * Output record: record corresponding to output schema obtained from input record's {@link
#PAYLOAD_FIELD} as bytes
+ *
+ * @deprecated use {@link EnvelopePayloadExtractingConverter}
  */
+@Deprecated
 public class EnvelopeSchemaConverter extends Converter<Schema, String, GenericRecord,
GenericRecord> {
 
   public static final String PAYLOAD_SCHEMA_ID_FIELD = "EnvelopeSchemaConverter.schemaIdField";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadConverterTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadConverterTest.java
new file mode 100644
index 0000000..561dfa0
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadConverterTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Iterables;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit test for {@link EnvelopePayloadConverter}
+ */
+public class EnvelopePayloadConverterTest {
+  private static final KafkaSchemaRegistry mockRegistry = mock(KafkaSchemaRegistry.class);
+
+  @Test
+  public void testConverter()
+      throws IOException, DataConversionException, SchemaRegistryException {
+    Schema inputSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/envelope.avsc"));
+    GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(inputSchema);
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), null);
+    FileUtils.copyInputStreamToFile(getClass().getResourceAsStream("/converter/envelope.avro"),
tmp);
+    DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(tmp,
datumReader);
+    GenericRecord inputRecord = dataFileReader.next();
+
+    Schema latestPayloadSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/record.avsc"));
+    when(mockRegistry.getLatestSchemaByTopic(any())).thenReturn(latestPayloadSchema);
+    when(mockRegistry.getSchemaByKey(any())).thenReturn(inputSchema.getField("nestedRecord").schema());
+
+    WorkUnitState workUnitState = new WorkUnitState();
+    workUnitState.setProp(BaseEnvelopeSchemaConverter.PAYLOAD_SCHEMA_TOPIC, "test");
+    workUnitState.setProp(BaseEnvelopeSchemaConverter.PAYLOAD_SCHEMA_ID_FIELD, "metadata.payloadSchemaId");
+    workUnitState
+        .setProp(BaseEnvelopeSchemaConverter.KAFKA_REGISTRY_FACTORY, MockKafkaAvroSchemaRegistryFactory.class.getName());
+
+    EnvelopePayloadConverter converter = new EnvelopePayloadConverter();
+    converter.init(workUnitState);
+
+    Schema outputSchema = converter.convertSchema(inputSchema, workUnitState);
+    List<GenericRecord> outputRecords = new ArrayList<>();
+    Iterables.addAll(outputRecords, converter.convertRecord(outputSchema, inputRecord, workUnitState));
+    Assert.assertTrue(outputRecords.size() == 1);
+
+    GenericRecord outputRecord = outputRecords.get(0);
+    GenericRecord payload = (GenericRecord) outputRecord.get("payload");
+    // While making the test envelope avro record, its nestedRecord was intentionally set
to the deserialized payload
+    GenericRecord expectedPayload = (GenericRecord) outputRecord.get("nestedRecord");
+
+    Schema payloadSchema = payload.getSchema();
+    Schema expectedPayloadSchema = expectedPayload.getSchema();
+    // The expected payload schema has the same number of fields as payload schema but in
different order
+    Assert.assertTrue(expectedPayloadSchema.getName().equals(payloadSchema.getName()));
+    Assert.assertTrue(expectedPayloadSchema.getNamespace().equals(payloadSchema.getNamespace()));
+    Assert.assertTrue(expectedPayloadSchema.getFields().size() == payloadSchema.getFields().size());
+
+    for (Schema.Field field : payload.getSchema().getFields()) {
+      Assert.assertTrue(expectedPayload.get(field.name()).equals(payload.get(field.name())));
+    }
+  }
+
+  static class MockKafkaAvroSchemaRegistryFactory extends KafkaAvroSchemaRegistryFactory
{
+    @Override
+    public KafkaSchemaRegistry create(Properties props) {
+      return mockRegistry;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverterTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverterTest.java
new file mode 100644
index 0000000..1aa517e
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverterTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Iterables;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit test for {@link EnvelopePayloadExtractingConverter}.
+ */
+@Test(groups = {"gobblin.converter"})
+public class EnvelopePayloadExtractingConverterTest {
+  private static final KafkaSchemaRegistry mockRegistry = mock(KafkaSchemaRegistry.class);
+
+  @Test
+  public void testConverter()
+      throws Exception {
+    Schema inputSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/envelope.avsc"));
+    GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(inputSchema);
+
+    File tmp = File.createTempFile(getClass().getSimpleName(), null);
+    FileUtils.copyInputStreamToFile(getClass().getResourceAsStream("/converter/envelope.avro"),
tmp);
+    DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(tmp,
datumReader);
+    GenericRecord inputRecord = dataFileReader.next();
+
+    Schema latestPayloadSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/record.avsc"));
+    when(mockRegistry.getLatestSchemaByTopic(any())).thenReturn(latestPayloadSchema);
+    when(mockRegistry.getSchemaByKey(any())).thenReturn(inputSchema.getField("nestedRecord").schema());
+
+    WorkUnitState workUnitState = new WorkUnitState();
+    workUnitState.setProp(BaseEnvelopeSchemaConverter.PAYLOAD_SCHEMA_TOPIC, "test");
+    workUnitState.setProp(BaseEnvelopeSchemaConverter.PAYLOAD_SCHEMA_ID_FIELD, "metadata.payloadSchemaId");
+    workUnitState.setProp(BaseEnvelopeSchemaConverter.KAFKA_REGISTRY_FACTORY,
+        EnvelopePayloadExtractingConverterTest.MockKafkaAvroSchemaRegistryFactory.class.getName());
+
+    EnvelopePayloadExtractingConverter converter = new EnvelopePayloadExtractingConverter();
+    converter.init(workUnitState);
+
+    Schema outputSchema = converter.convertSchema(inputSchema, workUnitState);
+    Assert.assertTrue(outputSchema.equals(latestPayloadSchema));
+
+    List<GenericRecord> outputRecords = new ArrayList<>();
+    Iterables.addAll(outputRecords, converter.convertRecord(outputSchema, inputRecord, workUnitState));
+    Assert.assertTrue(outputRecords.size() == 1);
+
+    GenericRecord payload = outputRecords.get(0);
+    // While making the test envelope avro input record, its nestedRecord was intentionally
set to the deserialized payload
+    GenericRecord expectedPayload = (GenericRecord) inputRecord.get("nestedRecord");
+
+    Schema payloadSchema = payload.getSchema();
+    Schema expectedPayloadSchema = expectedPayload.getSchema();
+    // The expected payload schema has the same number of fields as payload schema but in
different order
+    Assert.assertTrue(expectedPayloadSchema.getName().equals(payloadSchema.getName()));
+    Assert.assertTrue(expectedPayloadSchema.getNamespace().equals(payloadSchema.getNamespace()));
+    Assert.assertTrue(expectedPayloadSchema.getFields().size() == payloadSchema.getFields().size());
+
+    for (Schema.Field field : payload.getSchema().getFields()) {
+      Assert.assertTrue(expectedPayload.get(field.name()).equals(payload.get(field.name())));
+    }
+  }
+
+  static class MockKafkaAvroSchemaRegistryFactory extends KafkaAvroSchemaRegistryFactory
{
+    @Override
+    public KafkaSchemaRegistry create(Properties props) {
+      return mockRegistry;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopeSchemaConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopeSchemaConverterTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopeSchemaConverterTest.java
index a00e2c0..974e7b2 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopeSchemaConverterTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopeSchemaConverterTest.java
@@ -31,8 +31,11 @@ import static org.mockito.Mockito.when;
 
 /**
  * Unit test for {@link EnvelopeSchemaConverter}.
+ *
+ * @deprecated As a result of deprecating {@link EnvelopeSchemaConverter}
  */
 @Test(groups = {"gobblin.converter"})
+@Deprecated
 public class EnvelopeSchemaConverterTest {
 
   public static final String SCHEMA_KEY = "testKey";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaAvroSchemaRegistryForTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaAvroSchemaRegistryForTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaAvroSchemaRegistryForTest.java
index ea960db..81ca09a 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaAvroSchemaRegistryForTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaAvroSchemaRegistryForTest.java
@@ -25,7 +25,10 @@ import org.apache.avro.Schema;
 
 /**
  * Override some methods of {@link KafkaAvroSchemaRegistry} for use in {@link EnvelopeSchemaConverterTest}
+ *
+ * @deprecated Checkout {@link EnvelopePayloadExtractingConverterTest} for how to mock a
{@link KafkaSchemaRegistry}
  */
+@Deprecated
 public class KafkaAvroSchemaRegistryForTest extends KafkaAvroSchemaRegistry {
   public static class Factory implements KafkaSchemaRegistryFactory {
     public Factory() {}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avro
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avro
b/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avro
new file mode 100644
index 0000000..75745b3
Binary files /dev/null and b/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avro
differ

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avsc
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avsc
b/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avsc
new file mode 100644
index 0000000..f579ae5
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avsc
@@ -0,0 +1,46 @@
+{
+      "type": "record",
+      "name": "EnvelopeRecord",
+      "namespace": "org.apache.gobblin.test",
+      "fields": [
+        {
+          "name": "metadata",
+          "type": {
+            "type": "map",
+            "values": "string"
+          },
+          "doc": "record metadata."
+        },
+        {
+          "name": "key",
+          "type": "bytes",
+          "doc": "serialized key."
+        },
+        {
+          "name": "payload",
+          "type": "bytes",
+          "doc": "serialized payload data."
+        },
+    {
+      "name": "nestedRecord",
+      "type": {
+        "type": "record",
+        "name": "SimpleRecord",
+        "namespace": "org.apache.gobblin.test",
+        "fields": [
+          {
+            "name": "id",
+            "type": "string",
+            "doc": "ID of the record."
+          },
+          {
+            "name": "created",
+            "type": "long",
+            "doc": "a time stamp."
+          }
+        ]
+      },
+      "doc": "nested record"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/record.avsc
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/record.avsc
b/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/record.avsc
new file mode 100644
index 0000000..eeb12dd
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/record.avsc
@@ -0,0 +1,17 @@
+{
+  "type": "record",
+  "name": "SimpleRecord",
+  "namespace": "org.apache.gobblin.test",
+  "fields": [
+    {
+      "name": "created",
+      "type": "long",
+      "doc": "a time stamp."
+    },
+    {
+      "name": "id",
+      "type": "string",
+      "doc": "ID of the record."
+    }
+  ]
+}
\ No newline at end of file


Mime
View raw message