flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat
Date Fri, 01 Sep 2017 08:50:42 GMT
Repository: flink
Updated Branches:
  refs/heads/master d8d061b88 -> 391e39bc7


[FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/391e39bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/391e39bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/391e39bc

Branch: refs/heads/master
Commit: 391e39bc7647625c7ea59b5c3ce2cf23e891c550
Parents: d8d061b
Author: Vishnu Viswanath <vishnu.viswanath25@gmail.com>
Authored: Mon Jul 31 00:58:28 2017 -0400
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Sep 1 10:44:53 2017 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/AvroOutputFormat.java     |  7 ++++
 .../flink/api/java/io/AvroOutputFormatTest.java | 43 ++++++++++++++++++++
 2 files changed, 50 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/391e39bc/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index aed40bf..5da8f75 100644
--- a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
@@ -134,6 +135,12 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E>
implements Serializ
 			} catch (InstantiationException | IllegalAccessException e) {
 				throw new RuntimeException(e.getMessage());
 			}
+		} else if (org.apache.avro.generic.GenericRecord.class.isAssignableFrom(avroValueType))
{
+			if (userDefinedSchema == null) {
+				throw new IllegalStateException("Schema must be set when using Generic Record");
+			}
+			datumWriter = new GenericDatumWriter<E>(userDefinedSchema);
+			schema = userDefinedSchema;
 		} else {
 			datumWriter = new ReflectDatumWriter<E>(avroValueType);
 			schema = ReflectData.get().getSchema(avroValueType);

http://git-wip-us.apache.org/repos/asf/flink/blob/391e39bc/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
index 87334a7..71ebd78 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
@@ -24,6 +24,10 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
@@ -35,6 +39,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
 import static org.apache.flink.api.java.io.AvroOutputFormat.Codec;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -151,4 +156,42 @@ public class AvroOutputFormatTest {
 		}
 		outputFormat.close();
 	}
+
+	@Test
+	public void testGenericRecord() throws IOException {
+		final Path outputPath = new Path(File.createTempFile("avro-output-file", "generic.avro").getAbsolutePath());
+		final AvroOutputFormat<GenericRecord> outputFormat = new AvroOutputFormat<>(outputPath,
GenericRecord.class);
+		Schema schema = new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"user\", \"fields\":
[{\"name\":\"user_name\", \"type\":\"string\"}, {\"name\":\"favorite_number\", \"type\":\"int\"},
{\"name\":\"favorite_color\", \"type\":\"string\"}]}");
+		outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+		outputFormat.setSchema(schema);
+		output(outputFormat, schema);
+
+		GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+		DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(new File(outputPath.getPath()),
reader);
+
+		while (dataFileReader.hasNext()) {
+			GenericRecord record = dataFileReader.next();
+			assertEquals(record.get("user_name").toString(), "testUser");
+			assertEquals(record.get("favorite_number"), 1);
+			assertEquals(record.get("favorite_color").toString(), "blue");
+		}
+
+		//cleanup
+		FileSystem fs = FileSystem.getLocalFileSystem();
+		fs.delete(outputPath, false);
+
+	}
+
+	private void output(final AvroOutputFormat<GenericRecord> outputFormat, Schema schema)
throws IOException {
+		outputFormat.configure(new Configuration());
+		outputFormat.open(1, 1);
+		for (int i = 0; i < 100; i++) {
+			GenericRecord record = new GenericData.Record(schema);
+			record.put("user_name", "testUser");
+			record.put("favorite_number", 1);
+			record.put("favorite_color", "blue");
+			outputFormat.writeRecord(record);
+		}
+		outputFormat.close();
+	}
 }


Mime
View raw message