flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-7421] [table] Fix serializability of AvroRowSerializationSchema + AvroRowDeserializationSchema.
Date Fri, 03 Nov 2017 09:29:47 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 6714f4a39 -> 4c6b6c29d


[FLINK-7421] [table] Fix serializability of AvroRowSerializationSchema + AvroRowDeserializationSchema.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c6b6c29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c6b6c29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c6b6c29

Branch: refs/heads/release-1.3
Commit: 4c6b6c29d064a021db25feb1db62a122b913d06d
Parents: 6714f4a
Author: Fabian Hueske <fhueske@apache.org>
Authored: Thu Nov 2 21:40:26 2017 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Nov 3 09:51:09 2017 +0100

----------------------------------------------------------------------
 .../AvroRowDeserializationSchema.java           | 42 +++++++++++++++-----
 .../AvroRowSerializationSchema.java             | 41 ++++++++++++++-----
 .../kafka/AvroRowDeSerializationSchemaTest.java | 34 ++++++++++++++--
 3 files changed, 96 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4c6b6c29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
index 37241f5..0cfca5e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
@@ -16,9 +16,9 @@
  */
 package org.apache.flink.streaming.util.serialization;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
@@ -28,8 +28,12 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
 
 /**
  * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}.
@@ -41,24 +45,29 @@ import org.apache.flink.util.Preconditions;
 public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row>
{
 
 	/**
+	 * Avro record class.
+	 */
+	private Class<? extends SpecificRecord> recordClazz;
+
+	/**
 	 * Schema for deterministic field order.
 	 */
-	private final Schema schema;
+	private transient Schema schema;
 
 	/**
 	 * Reader that deserializes byte array into a record.
 	 */
-	private final DatumReader<SpecificRecord> datumReader;
+	private transient DatumReader<SpecificRecord> datumReader;
 
 	/**
 	 * Input stream to read message from.
 	 */
-	private final MutableByteArrayInputStream inputStream;
+	private transient MutableByteArrayInputStream inputStream;
 
 	/**
 	 * Avro decoder that decodes binary data
 	 */
-	private final Decoder decoder;
+	private transient Decoder decoder;
 
 	/**
 	 * Record to deserialize byte array to.
@@ -72,6 +81,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 	 */
 	public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
 		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
+		this.recordClazz = recordClazz;
 		this.schema = SpecificData.get().getSchema(recordClazz);
 		this.datumReader = new SpecificDatumReader<>(schema);
 		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
@@ -94,6 +104,20 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		return (Row) row;
 	}
 
+	private void writeObject(ObjectOutputStream oos) throws IOException {
+		oos.writeObject(recordClazz);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException
{
+		this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
+		this.schema = SpecificData.get().getSchema(recordClazz);
+		this.datumReader = new SpecificDatumReader<>(schema);
+		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
+		this.inputStream = new MutableByteArrayInputStream();
+		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+	}
+
 	/**
 	 * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type.
 	 * Avro's {@link Utf8} fields are converted into regular Java strings.

http://git-wip-us.apache.org/repos/asf/flink/blob/4c6b6c29/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
index 8388ab5..1cda529 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
@@ -17,9 +17,9 @@
  */
 package org.apache.flink.streaming.util.serialization;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -30,8 +30,12 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
 
 /**
  * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro
bytes.
@@ -39,24 +43,29 @@ import org.apache.flink.util.Preconditions;
 public class AvroRowSerializationSchema implements SerializationSchema<Row> {
 
 	/**
+	 * Avro record class.
+	 */
+	private Class<? extends SpecificRecord> recordClazz;
+
+	/**
 	 * Avro serialization schema.
 	 */
-	private final Schema schema;
+	private transient Schema schema;
 
 	/**
 	 * Writer to serialize Avro record into a byte array.
 	 */
-	private final DatumWriter<GenericRecord> datumWriter;
+	private transient DatumWriter<GenericRecord> datumWriter;
 
 	/**
 	 * Output stream to serialize records into byte array.
 	 */
-	private final ByteArrayOutputStream arrayOutputStream =  new ByteArrayOutputStream();
+	private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
 
 	/**
 	 * Low-level class for serialization of Avro values.
 	 */
-	private final Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+	private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream,
null);
 
 	/**
 	 * Creates a Avro serialization schema for the given schema.
@@ -65,6 +74,7 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row>
{
 	 */
 	public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) {
 		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
+		this.recordClazz = recordClazz;
 		this.schema = SpecificData.get().getSchema(recordClazz);
 		this.datumWriter = new SpecificDatumWriter<>(schema);
 	}
@@ -86,6 +96,19 @@ public class AvroRowSerializationSchema implements SerializationSchema<Row>
{
 		}
 	}
 
+	private void writeObject(ObjectOutputStream oos) throws IOException {
+		oos.writeObject(recordClazz);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException
{
+		this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
+		this.schema = SpecificData.get().getSchema(recordClazz);
+		this.datumWriter = new SpecificDatumWriter<>(schema);
+		this.arrayOutputStream = new ByteArrayOutputStream();
+		this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+	}
+
 	/**
 	 * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
 	 * Strings are converted into Avro's {@link Utf8} fields.

http://git-wip-us.apache.org/repos/asf/flink/blob/4c6b6c29/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
index e13968e..b844e43 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
@@ -17,16 +17,21 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.io.IOException;
-import org.apache.avro.specific.SpecificRecord;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
 import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
 import org.apache.flink.types.Row;
-import static org.junit.Assert.assertEquals;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.avro.specific.SpecificRecord;
+
+import java.io.IOException;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Test for the Avro serialization and deserialization schema.
  */
@@ -117,4 +122,27 @@ public class AvroRowDeSerializationSchemaTest {
 
 		assertEquals(testData.f2, actual);
 	}
+
+	@Test
+	public void testSerializability() throws IOException, ClassNotFoundException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData
= AvroTestUtils.getComplexTestData();
+
+		final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0);
+		final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0);
+
+		byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
+		byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig);
+
+		AvroRowSerializationSchema serCopy =
+			InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader());
+		AvroRowDeserializationSchema deserCopy =
+			InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader());
+
+		final byte[] bytes = serCopy.serialize(testData.f2);
+		deserCopy.deserialize(bytes);
+		deserCopy.deserialize(bytes);
+		final Row actual = deserCopy.deserialize(bytes);
+
+		assertEquals(testData.f2, actual);
+	}
 }


Mime
View raw message