flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [70/92] [abbrv] prefix all projects in addons and quickstarts with flink-
Date Tue, 22 Jul 2014 10:41:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
new file mode 100644
index 0000000..76b23ef
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.flink.api.avro.DataInputDecoder;
+import org.apache.flink.api.avro.DataOutputEncoder;
+import org.apache.flink.api.java.record.io.avro.generated.Colors;
+import org.apache.flink.api.java.record.io.avro.generated.User;
+import org.apache.flink.util.StringUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
+ */
+public class EncoderDecoderTest {
+	
+	@Test
+	public void testComplexStringsDirecty() {
+		try {
+			Random rnd = new Random(349712539451944123L);
+			
+			for (int i = 0; i < 10; i++) {
+				String testString = StringUtils.getRandomString(rnd, 10, 100);
+				
+				ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+				{
+					DataOutputStream dataOut = new DataOutputStream(baos);
+					DataOutputEncoder encoder = new DataOutputEncoder();
+					encoder.setOut(dataOut);
+					
+					encoder.writeString(testString);
+					dataOut.flush();
+					dataOut.close();
+				}
+				
+				byte[] data = baos.toByteArray();
+				
+				// deserialize
+				{
+					ByteArrayInputStream bais = new ByteArrayInputStream(data);
+					DataInputStream dataIn = new DataInputStream(bais);
+					DataInputDecoder decoder = new DataInputDecoder();
+					decoder.setIn(dataIn);
+	
+					String deserialized = decoder.readString();
+					
+					assertEquals(testString, deserialized);
+				}
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test failed due to an exception: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPrimitiveTypes() {
+		
+		testObjectSerialization(new Boolean(true));
+		testObjectSerialization(new Boolean(false));
+		
+		testObjectSerialization(new Byte((byte) 0));
+		testObjectSerialization(new Byte((byte) 1));
+		testObjectSerialization(new Byte((byte) -1));
+		testObjectSerialization(new Byte(Byte.MIN_VALUE));
+		testObjectSerialization(new Byte(Byte.MAX_VALUE));
+		
+		testObjectSerialization(new Short((short) 0));
+		testObjectSerialization(new Short((short) 1));
+		testObjectSerialization(new Short((short) -1));
+		testObjectSerialization(new Short(Short.MIN_VALUE));
+		testObjectSerialization(new Short(Short.MAX_VALUE));
+		
+		testObjectSerialization(new Integer(0));
+		testObjectSerialization(new Integer(1));
+		testObjectSerialization(new Integer(-1));
+		testObjectSerialization(new Integer(Integer.MIN_VALUE));
+		testObjectSerialization(new Integer(Integer.MAX_VALUE));
+		
+		testObjectSerialization(new Long(0));
+		testObjectSerialization(new Long(1));
+		testObjectSerialization(new Long(-1));
+		testObjectSerialization(new Long(Long.MIN_VALUE));
+		testObjectSerialization(new Long(Long.MAX_VALUE));
+		
+		testObjectSerialization(new Float(0));
+		testObjectSerialization(new Float(1));
+		testObjectSerialization(new Float(-1));
+		testObjectSerialization(new Float((float)Math.E));
+		testObjectSerialization(new Float((float)Math.PI));
+		testObjectSerialization(new Float(Float.MIN_VALUE));
+		testObjectSerialization(new Float(Float.MAX_VALUE));
+		testObjectSerialization(new Float(Float.MIN_NORMAL));
+		testObjectSerialization(new Float(Float.NaN));
+		testObjectSerialization(new Float(Float.NEGATIVE_INFINITY));
+		testObjectSerialization(new Float(Float.POSITIVE_INFINITY));
+		
+		testObjectSerialization(new Double(0));
+		testObjectSerialization(new Double(1));
+		testObjectSerialization(new Double(-1));
+		testObjectSerialization(new Double(Math.E));
+		testObjectSerialization(new Double(Math.PI));
+		testObjectSerialization(new Double(Double.MIN_VALUE));
+		testObjectSerialization(new Double(Double.MAX_VALUE));
+		testObjectSerialization(new Double(Double.MIN_NORMAL));
+		testObjectSerialization(new Double(Double.NaN));
+		testObjectSerialization(new Double(Double.NEGATIVE_INFINITY));
+		testObjectSerialization(new Double(Double.POSITIVE_INFINITY));
+		
+		testObjectSerialization("");
+		testObjectSerialization("abcdefg");
+		testObjectSerialization("ab\u1535\u0155xyz\u706F");
+		
+		testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523));
+		testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001));
+	}
+	
+	@Test
+	public void testArrayTypes() {
+		{
+			int[] array = new int[] {1, 2, 3, 4, 5};
+			testObjectSerialization(array);
+		}
+		{
+			long[] array = new long[] {1, 2, 3, 4, 5};
+			testObjectSerialization(array);
+		}
+		{
+			float[] array = new float[] {1, 2, 3, 4, 5};
+			testObjectSerialization(array);
+		}
+		{
+			double[] array = new double[] {1, 2, 3, 4, 5};
+			testObjectSerialization(array);
+		}
+		{
+			String[] array = new String[] {"Oh", "my", "what", "do", "we", "have", "here", "?"};
+			testObjectSerialization(array);
+		}
+	}
+	
+	@Test
+	public void testEmptyArray() {
+		{
+			int[] array = new int[0];
+			testObjectSerialization(array);
+		}
+		{
+			long[] array = new long[0];
+			testObjectSerialization(array);
+		}
+		{
+			float[] array = new float[0];
+			testObjectSerialization(array);
+		}
+		{
+			double[] array = new double[0];
+			testObjectSerialization(array);
+		}
+		{
+			String[] array = new String[0];
+			testObjectSerialization(array);
+		}
+	}
+	
+	@Test
+	public void testObjects() {
+		// simple object containing only primitives
+		{
+			testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42));
+		}
+		
+		// object with collection
+		{
+			ArrayList<String> list = new ArrayList<String>();
+			list.add("A");
+			list.add("B");
+			list.add("C");
+			list.add("D");
+			list.add("E");
+			
+			testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym"));
+		}
+		
+		// object with empty collection
+		{
+			ArrayList<String> list = new ArrayList<String>();
+			testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus"));
+		}
+	}
+	
+	@Test
+	public void testNestedObjectsWithCollections() {
+		testObjectSerialization(new ComplexNestedObject2(true));
+	}
+	
+	@Test
+	public void testGeneratedObjectWithNullableFields() {
+		List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" });
+		List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true);
+		Map<CharSequence, Long> map = new HashMap<CharSequence, Long>();
+		map.put("1", 1L);
+		map.put("2", 2L);
+		map.put("3", 3L);
+		
+		User user = new User("Freudenreich", 1337, "macintosh gray", 1234567890L, 3.1415926, null, true, strings, bools, null, Colors.GREEN, map);
+		
+		testObjectSerialization(user);
+	}
+	
+	@Test
+	public void testVarLenCountEncoding() {
+		try {
+			long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL};
+			
+			// write
+			ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+			{
+				DataOutputStream dataOut = new DataOutputStream(baos);
+				
+				for (long val : values) {
+					DataOutputEncoder.writeVarLongCount(dataOut, val);
+				}
+				
+				dataOut.flush();
+				dataOut.close();
+			}
+			
+			// read
+			{
+				ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+				DataInputStream dataIn = new DataInputStream(bais);
+				
+				for (long val : values) {
+					long read = DataInputDecoder.readVarLongCount(dataIn);
+					assertEquals("Wrong var-len encoded value read.", val, read);
+				}
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test failed due to an exception: " + e.getMessage());
+		}
+	}
+	
+	private static <X> void testObjectSerialization(X obj) {
+		
+		try {
+			
+			// serialize
+			ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+			{
+				DataOutputStream dataOut = new DataOutputStream(baos);
+				DataOutputEncoder encoder = new DataOutputEncoder();
+				encoder.setOut(dataOut);
+				
+				@SuppressWarnings("unchecked")
+				Class<X> clazz = (Class<X>) obj.getClass();
+				ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz);
+				
+				writer.write(obj, encoder);
+				dataOut.flush();
+				dataOut.close();
+			}
+			
+			byte[] data = baos.toByteArray();
+			X result = null;
+			
+			// deserialize
+			{
+				ByteArrayInputStream bais = new ByteArrayInputStream(data);
+				DataInputStream dataIn = new DataInputStream(bais);
+				DataInputDecoder decoder = new DataInputDecoder();
+				decoder.setIn(dataIn);
+
+				@SuppressWarnings("unchecked")
+				Class<X> clazz = (Class<X>) obj.getClass();
+				ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz);
+				
+				// create a reuse object if possible, otherwise we have no reuse object 
+				X reuse = null;
+				try {
+					@SuppressWarnings("unchecked")
+					X test = (X) obj.getClass().newInstance();
+					reuse = test;
+				} catch (Throwable t) {}
+				
+				result = reader.read(reuse, decoder);
+			}
+			
+			// check
+			final String message = "Deserialized object is not the same as the original";
+			
+			if (obj.getClass().isArray()) {
+				Class<?> clazz = obj.getClass();
+				if (clazz == byte[].class) {
+					assertArrayEquals(message, (byte[]) obj, (byte[]) result);
+				}
+				else if (clazz == short[].class) {
+					assertArrayEquals(message, (short[]) obj, (short[]) result);
+				}
+				else if (clazz == int[].class) {
+					assertArrayEquals(message, (int[]) obj, (int[]) result);
+				}
+				else if (clazz == long[].class) {
+					assertArrayEquals(message, (long[]) obj, (long[]) result);
+				}
+				else if (clazz == char[].class) {
+					assertArrayEquals(message, (char[]) obj, (char[]) result);
+				}
+				else if (clazz == float[].class) {
+					assertArrayEquals(message, (float[]) obj, (float[]) result, 0.0f);
+				}
+				else if (clazz == double[].class) {
+					assertArrayEquals(message, (double[]) obj, (double[]) result, 0.0);
+				} else {
+					assertArrayEquals(message, (Object[]) obj, (Object[]) result);
+				}
+			} else {
+				assertEquals(message, obj, result);
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test failed due to an exception: " + e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Test Objects
+	// --------------------------------------------------------------------------------------------
+
+
+	public static final class SimpleTypes {
+		
+		private final int iVal;
+		private final long lVal;
+		private final byte bVal;
+		private final String sVal;
+		private final short rVal;
+		private final double dVal;
+		
+		
+		public SimpleTypes() {
+			this(0, 0, (byte) 0, "", (short) 0, 0);
+		}
+		
+		public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
+			this.iVal = iVal;
+			this.lVal = lVal;
+			this.bVal = bVal;
+			this.sVal = sVal;
+			this.rVal = rVal;
+			this.dVal = dVal;
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == SimpleTypes.class) {
+				SimpleTypes other = (SimpleTypes) obj;
+				
+				return other.iVal == this.iVal &&
+						other.lVal == this.lVal &&
+						other.bVal == this.bVal &&
+						other.sVal.equals(this.sVal) &&
+						other.rVal == this.rVal &&
+						other.dVal == this.dVal;
+				
+			} else {
+				return false;
+			}
+		}
+	}
+	
+	public static class ComplexNestedObject1 {
+		
+		private double doubleValue;
+		
+		private List<String> stringList;
+		
+		public ComplexNestedObject1() {}
+		
+		public ComplexNestedObject1(int offInit) {
+			this.doubleValue = 6293485.6723 + offInit;
+				
+			this.stringList = new ArrayList<String>();
+			this.stringList.add("A" + offInit);
+			this.stringList.add("somewhat" + offInit);
+			this.stringList.add("random" + offInit);
+			this.stringList.add("collection" + offInit);
+			this.stringList.add("of" + offInit);
+			this.stringList.add("strings" + offInit);
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == ComplexNestedObject1.class) {
+				ComplexNestedObject1 other = (ComplexNestedObject1) obj;
+				return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
+			} else {
+				return false;
+			}
+		}
+	}
+	
+	public static class ComplexNestedObject2 {
+		
+		private long longValue;
+		
+		private Map<String, ComplexNestedObject1> theMap;
+		
+		public ComplexNestedObject2() {}
+		
+		public ComplexNestedObject2(boolean init) {
+			this.longValue = 46547;
+				
+			this.theMap = new HashMap<String, ComplexNestedObject1>();
+			this.theMap.put("36354L", new ComplexNestedObject1(43546543));
+			this.theMap.put("785611L", new ComplexNestedObject1(45784568));
+			this.theMap.put("43L", new ComplexNestedObject1(9876543));
+			this.theMap.put("-45687L", new ComplexNestedObject1(7897615));
+			this.theMap.put("1919876876896L", new ComplexNestedObject1(27154));
+			this.theMap.put("-868468468L", new ComplexNestedObject1(546435));
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == ComplexNestedObject2.class) {
+				ComplexNestedObject2 other = (ComplexNestedObject2) obj;
+				return other.longValue == this.longValue && this.theMap.equals(other.theMap);
+			} else {
+				return false;
+			}
+		}
+	}
+	
+	public static class Book {
+
+		private long bookId;
+		private String title;
+		private long authorId;
+
+		public Book() {}
+
+		public Book(long bookId, String title, long authorId) {
+			this.bookId = bookId;
+			this.title = title;
+			this.authorId = authorId;
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == Book.class) {
+				Book other = (Book) obj;
+				return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
+			} else {
+				return false;
+			}
+		}
+	}
+
+	public static class BookAuthor {
+
+		private long authorId;
+		private List<String> bookTitles;
+		private String authorName;
+
+		public BookAuthor() {}
+
+		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
+			this.authorId = authorId;
+			this.bookTitles = bookTitles;
+			this.authorName = authorName;
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == BookAuthor.class) {
+				BookAuthor other = (BookAuthor) obj;
+				return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
+						other.bookTitles.equals(this.bookTitles);
+			} else {
+				return false;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
new file mode 100644
index 0000000..146c72b
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.avro.testjar;
+
+// ================================================================================================
+//  This file defines the classes for the AvroExternalJarProgramITCase.
+//  The program is exported into src/test/resources/AvroTestProgram.jar.
+//
+//  THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED
+//  AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL
+//  NOT BE COVERED BY THIS TEST.
+// ================================================================================================
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.flink.api.avro.AvroBaseValue;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.core.fs.Path;
+
+public class AvroExternalJarProgram  {
+
+	public static final class Color {
+		
+		private String name;
+		private double saturation;
+		
+		public Color() {
+			name = "";
+			saturation = 1.0;
+		}
+		
+		public Color(String name, double saturation) {
+			this.name = name;
+			this.saturation = saturation;
+		}
+		
+		public String getName() {
+			return name;
+		}
+		
+		public void setName(String name) {
+			this.name = name;
+		}
+		
+		public double getSaturation() {
+			return saturation;
+		}
+		
+		public void setSaturation(double saturation) {
+			this.saturation = saturation;
+		}
+		
+		@Override
+		public String toString() {
+			return name + '(' + saturation + ')';
+		}
+	}
+	
+	public static final class MyUser {
+		
+		private String name;
+		private List<Color> colors;
+		
+		public MyUser() {
+			name = "unknown";
+			colors = new ArrayList<Color>();
+		}
+		
+		public MyUser(String name, List<Color> colors) {
+			this.name = name;
+			this.colors = colors;
+		}
+		
+		public String getName() {
+			return name;
+		}
+		
+		public List<Color> getColors() {
+			return colors;
+		}
+		
+		public void setName(String name) {
+			this.name = name;
+		}
+		
+		public void setColors(List<Color> colors) {
+			this.colors = colors;
+		}
+		
+		@Override
+		public String toString() {
+			return name + " : " + colors;
+		}
+	}
+	
+	
+	public static final class SUser extends AvroBaseValue<MyUser> {
+		
+		static final long serialVersionUID = 1L;
+
+		public SUser() {}
+	
+		public SUser(MyUser u) {
+			super(u);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class NameExtractor extends MapFunction<MyUser, Tuple2<String, MyUser>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, MyUser> map(MyUser u) {
+			String namePrefix = u.getName().substring(0, 1);
+			return new Tuple2<String, MyUser>(namePrefix, u);
+		}
+	}
+	
+	public static final class NameGrouper extends ReduceFunction<Tuple2<String, MyUser>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) {
+			return val1;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Test Data
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class Generator {
+		
+		private final Random rnd = new Random(2389756789345689276L);
+		
+		public MyUser nextUser() {
+			return randomUser();
+		}
+		
+		private MyUser randomUser() {
+			
+			int numColors = rnd.nextInt(5);
+			ArrayList<Color> colors = new ArrayList<Color>(numColors);
+			for (int i = 0; i < numColors; i++) {
+				colors.add(new Color(randomString(), rnd.nextDouble()));
+			}
+			
+			return new MyUser(randomString(), colors);
+		}
+		
+		private String randomString() {
+			char[] c = new char[this.rnd.nextInt(20) + 5];
+			
+			for (int i = 0; i < c.length; i++) {
+				c[i] = (char) (this.rnd.nextInt(150) + 40);
+			}
+			
+			return new String(c);
+		}
+	}
+	
+	public static void writeTestData(File testFile, int numRecords) throws IOException {
+		
+		DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
+		DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
+		
+		dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
+		
+		
+		Generator generator = new Generator();
+		
+		for (int i = 0; i < numRecords; i++) {
+			MyUser user = generator.nextUser();
+			dataFileWriter.append(user);
+		}
+		
+		dataFileWriter.close();
+	}
+
+//	public static void main(String[] args) throws Exception {
+//		String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
+//		writeTestData(new File(testDataFile), 50);
+//	}
+	
+	public static void main(String[] args) throws Exception {
+		String inputPath = args[0];
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
+	
+		DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
+		
+		result.output(new DiscardingOuputFormat<Tuple2<String,MyUser>>());
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
new file mode 100644
index 0000000..aa08006
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.io;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.TypeInformation;
+
+public class AvroInputFormatTypeExtractionTest {
+
+	@Test
+	public void testTypeExtraction() {
+		try {
+			InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class);
+			
+			TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format);
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			DataSet<MyAvroType> input = env.createInput(format);
+			TypeInformation<?> typeInfoDataSet = input.getType();
+			
+			
+			Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
+			Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);
+			
+			Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass());
+			Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	public static final class MyAvroType {
+		
+		public String theString;
+		
+		private double aDouble;
+		
+		public double getaDouble() {
+			return aDouble;
+		}
+		
+		public void setaDouble(double aDouble) {
+			this.aDouble = aDouble;
+		}
+		
+		public void setTheString(String theString) {
+			this.theString = theString;
+		}
+		
+		public String getTheString() {
+			return theString;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
new file mode 100644
index 0000000..2387fd6
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.record.io.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat;
+import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.BooleanListValue;
+import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.LongMapValue;
+import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.StringListValue;
+import org.apache.flink.api.java.record.io.avro.generated.Colors;
+import org.apache.flink.api.java.record.io.avro.generated.User;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+/**
+ * Test the avro input format.
+ * (The testcase is mostly the getting started tutorial of avro)
+ * http://avro.apache.org/docs/current/gettingstartedjava.html
+ */
+public class AvroRecordInputFormatTest {
+	
+	private File testFile;
+	
+	private final AvroRecordInputFormat format = new AvroRecordInputFormat();
+	final static String TEST_NAME = "Alyssa";
+	
+	final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+	final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+	
+	final static boolean TEST_ARRAY_BOOLEAN_1 = true;
+	final static boolean TEST_ARRAY_BOOLEAN_2 = false;
+	
+	final static Colors TEST_ENUM_COLOR = Colors.GREEN;
+	
+	final static CharSequence TEST_MAP_KEY1 = "KEY 1";
+	final static long TEST_MAP_VALUE1 = 8546456L;
+	final static CharSequence TEST_MAP_KEY2 = "KEY 2";
+	final static long TEST_MAP_VALUE2 = 17554L;
+	
+	
+	@Before
+	public void createFiles() throws IOException {
+		testFile = File.createTempFile("AvroInputFormatTest", null);
+		
+		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+		stringArray.add(TEST_ARRAY_STRING_1);
+		stringArray.add(TEST_ARRAY_STRING_2);
+		
+		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
+		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
+		
+		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
+		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
+		
+		
+		User user1 = new User();
+		user1.setName(TEST_NAME);
+		user1.setFavoriteNumber(256);
+		user1.setTypeDoubleTest(123.45d);
+		user1.setTypeBoolTest(true);
+		user1.setTypeArrayString(stringArray);
+		user1.setTypeArrayBoolean(booleanArray);
+		user1.setTypeEnum(TEST_ENUM_COLOR);
+		user1.setTypeMap(longMap);
+	     
+		// Construct via builder
+		User user2 = User.newBuilder()
+		             .setName("Charlie")
+		             .setFavoriteColor("blue")
+		             .setFavoriteNumber(null)
+		             .setTypeBoolTest(false)
+		             .setTypeDoubleTest(1.337d)
+		             .setTypeNullTest(null)
+		             .setTypeLongTest(1337L)
+		             .setTypeArrayString(new ArrayList<CharSequence>())
+		             .setTypeArrayBoolean(new ArrayList<Boolean>())
+		             .setTypeNullableArray(null)
+		             .setTypeEnum(Colors.RED)
+		             .setTypeMap(new HashMap<CharSequence, Long>())
+		             .build();
+		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
+		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+		dataFileWriter.create(user1.getSchema(), testFile);
+		dataFileWriter.append(user1);
+		dataFileWriter.append(user2);
+		dataFileWriter.close();
+	}
+	
+	@Test
+	public void testDeserialisation() throws IOException {
+		Configuration parameters = new Configuration();
+		format.setFilePath(testFile.toURI().toString());
+		format.configure(parameters);
+		FileInputSplit[] splits = format.createInputSplits(1);
+		Assert.assertEquals(splits.length, 1);
+		format.open(splits[0]);
+		Record record = new Record();
+		Assert.assertNotNull(format.nextRecord(record));
+		StringValue name = record.getField(0, StringValue.class);
+		Assert.assertNotNull("empty record", name);
+		Assert.assertEquals("name not equal",name.getValue(), TEST_NAME);
+		
+		// check arrays
+		StringListValue sl = record.getField(7, AvroRecordInputFormat.StringListValue.class);
+		Assert.assertEquals("element 0 not equal", sl.get(0).getValue(), TEST_ARRAY_STRING_1);
+		Assert.assertEquals("element 1 not equal", sl.get(1).getValue(), TEST_ARRAY_STRING_2);
+		
+		BooleanListValue bl = record.getField(8, AvroRecordInputFormat.BooleanListValue.class);
+		Assert.assertEquals("element 0 not equal", bl.get(0).getValue(), TEST_ARRAY_BOOLEAN_1);
+		Assert.assertEquals("element 1 not equal", bl.get(1).getValue(), TEST_ARRAY_BOOLEAN_2);
+		
+		// check enums
+		StringValue enumValue = record.getField(10, StringValue.class);
+		Assert.assertEquals("string representation of enum not equal", enumValue.getValue(), TEST_ENUM_COLOR.toString()); 
+		
+		// check maps
+		LongMapValue lm = record.getField(11, AvroRecordInputFormat.LongMapValue.class);
+		Assert.assertEquals("map value of key 1 not equal", lm.get(new StringValue(TEST_MAP_KEY1)).getValue(), TEST_MAP_VALUE1);
+		Assert.assertEquals("map value of key 2 not equal", lm.get(new StringValue(TEST_MAP_KEY2)).getValue(), TEST_MAP_VALUE2);
+		
+		
+		Assert.assertFalse("expecting second element", format.reachedEnd());
+		Assert.assertNotNull("expecting second element", format.nextRecord(record));
+		
+		Assert.assertNull(format.nextRecord(record));
+		Assert.assertTrue(format.reachedEnd());
+		
+		format.close();
+	}
+	
+	@After
+	public void deleteFiles() {
+		testFile.delete();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
new file mode 100644
index 0000000..4cacb7f
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ * 
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.java.record.io.avro.generated;  
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public enum Colors { 
+  RED, GREEN, BLUE  ;
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.java.record.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
new file mode 100644
index 0000000..61bbe41
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
@@ -0,0 +1,755 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ * 
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.java.record.io.avro.generated;  
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.java.record.io.avro.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]},{\"name\":\"type_double_test\",\"type\":[\"double\"]},{\"name\":\"type_null_test\",\"type\":[\"null\"]},{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\",\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\",\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"n
 ame\":\"type_map\",\"type\":{\"type\":\"map\",\"values\":\"long\"}}]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+  @Deprecated public java.lang.CharSequence name;
+  @Deprecated public java.lang.Integer favorite_number;
+  @Deprecated public java.lang.CharSequence favorite_color;
+  @Deprecated public java.lang.Long type_long_test;
+  @Deprecated public java.lang.Object type_double_test;
+  @Deprecated public java.lang.Object type_null_test;
+  @Deprecated public java.lang.Object type_bool_test;
+  @Deprecated public java.util.List<java.lang.CharSequence> type_array_string;
+  @Deprecated public java.util.List<java.lang.Boolean> type_array_boolean;
+  @Deprecated public java.util.List<java.lang.CharSequence> type_nullable_array;
+  @Deprecated public org.apache.flink.api.java.record.io.avro.generated.Colors type_enum;
+  @Deprecated public java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use {@link \#newBuilder()}. 
+   */
+  public User() {}
+
+  /**
+   * All-args constructor.
+   */
+  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color, java.lang.Long type_long_test, java.lang.Object type_double_test, java.lang.Object type_null_test, java.lang.Object type_bool_test, java.util.List<java.lang.CharSequence> type_array_string, java.util.List<java.lang.Boolean> type_array_boolean, java.util.List<java.lang.CharSequence> type_nullable_array, org.apache.flink.api.java.record.io.avro.generated.Colors type_enum, java.util.Map<java.lang.CharSequence,java.lang.Long> type_map) {
+    this.name = name;
+    this.favorite_number = favorite_number;
+    this.favorite_color = favorite_color;
+    this.type_long_test = type_long_test;
+    this.type_double_test = type_double_test;
+    this.type_null_test = type_null_test;
+    this.type_bool_test = type_bool_test;
+    this.type_array_string = type_array_string;
+    this.type_array_boolean = type_array_boolean;
+    this.type_nullable_array = type_nullable_array;
+    this.type_enum = type_enum;
+    this.type_map = type_map;
+  }
+
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return name;
+    case 1: return favorite_number;
+    case 2: return favorite_color;
+    case 3: return type_long_test;
+    case 4: return type_double_test;
+    case 5: return type_null_test;
+    case 6: return type_bool_test;
+    case 7: return type_array_string;
+    case 8: return type_array_boolean;
+    case 9: return type_nullable_array;
+    case 10: return type_enum;
+    case 11: return type_map;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call. 
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: name = (java.lang.CharSequence)value$; break;
+    case 1: favorite_number = (java.lang.Integer)value$; break;
+    case 2: favorite_color = (java.lang.CharSequence)value$; break;
+    case 3: type_long_test = (java.lang.Long)value$; break;
+    case 4: type_double_test = (java.lang.Object)value$; break;
+    case 5: type_null_test = (java.lang.Object)value$; break;
+    case 6: type_bool_test = (java.lang.Object)value$; break;
+    case 7: type_array_string = (java.util.List<java.lang.CharSequence>)value$; break;
+    case 8: type_array_boolean = (java.util.List<java.lang.Boolean>)value$; break;
+    case 9: type_nullable_array = (java.util.List<java.lang.CharSequence>)value$; break;
+    case 10: type_enum = (org.apache.flink.api.java.record.io.avro.generated.Colors)value$; break;
+    case 11: type_map = (java.util.Map<java.lang.CharSequence,java.lang.Long>)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'name' field.
+   */
+  public java.lang.CharSequence getName() {
+    return name;
+  }
+
+  /**
+   * Sets the value of the 'name' field.
+   * @param value the value to set.
+   */
+  public void setName(java.lang.CharSequence value) {
+    this.name = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_number' field.
+   */
+  public java.lang.Integer getFavoriteNumber() {
+    return favorite_number;
+  }
+
+  /**
+   * Sets the value of the 'favorite_number' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteNumber(java.lang.Integer value) {
+    this.favorite_number = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_color' field.
+   */
+  public java.lang.CharSequence getFavoriteColor() {
+    return favorite_color;
+  }
+
+  /**
+   * Sets the value of the 'favorite_color' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteColor(java.lang.CharSequence value) {
+    this.favorite_color = value;
+  }
+
+  /**
+   * Gets the value of the 'type_long_test' field.
+   */
+  public java.lang.Long getTypeLongTest() {
+    return type_long_test;
+  }
+
+  /**
+   * Sets the value of the 'type_long_test' field.
+   * @param value the value to set.
+   */
+  public void setTypeLongTest(java.lang.Long value) {
+    this.type_long_test = value;
+  }
+
+  /**
+   * Gets the value of the 'type_double_test' field.
+   */
+  public java.lang.Object getTypeDoubleTest() {
+    return type_double_test;
+  }
+
+  /**
+   * Sets the value of the 'type_double_test' field.
+   * @param value the value to set.
+   */
+  public void setTypeDoubleTest(java.lang.Object value) {
+    this.type_double_test = value;
+  }
+
+  /**
+   * Gets the value of the 'type_null_test' field.
+   */
+  public java.lang.Object getTypeNullTest() {
+    return type_null_test;
+  }
+
+  /**
+   * Sets the value of the 'type_null_test' field.
+   * @param value the value to set.
+   */
+  public void setTypeNullTest(java.lang.Object value) {
+    this.type_null_test = value;
+  }
+
+  /**
+   * Gets the value of the 'type_bool_test' field.
+   */
+  public java.lang.Object getTypeBoolTest() {
+    return type_bool_test;
+  }
+
+  /**
+   * Sets the value of the 'type_bool_test' field.
+   * @param value the value to set.
+   */
+  public void setTypeBoolTest(java.lang.Object value) {
+    this.type_bool_test = value;
+  }
+
+  /**
+   * Gets the value of the 'type_array_string' field.
+   */
+  public java.util.List<java.lang.CharSequence> getTypeArrayString() {
+    return type_array_string;
+  }
+
+  /**
+   * Sets the value of the 'type_array_string' field.
+   * @param value the value to set.
+   */
+  public void setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
+    this.type_array_string = value;
+  }
+
+  /**
+   * Gets the value of the 'type_array_boolean' field.
+   */
+  public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
+    return type_array_boolean;
+  }
+
+  /**
+   * Sets the value of the 'type_array_boolean' field.
+   * @param value the value to set.
+   */
+  public void setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
+    this.type_array_boolean = value;
+  }
+
+  /**
+   * Gets the value of the 'type_nullable_array' field.
+   */
+  public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
+    return type_nullable_array;
+  }
+
+  /**
+   * Sets the value of the 'type_nullable_array' field.
+   * @param value the value to set.
+   */
+  public void setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
+    this.type_nullable_array = value;
+  }
+
+  /**
+   * Gets the value of the 'type_enum' field.
+   */
+  public org.apache.flink.api.java.record.io.avro.generated.Colors getTypeEnum() {
+    return type_enum;
+  }
+
+  /**
+   * Sets the value of the 'type_enum' field.
+   * @param value the value to set.
+   */
+  public void setTypeEnum(org.apache.flink.api.java.record.io.avro.generated.Colors value) {
+    this.type_enum = value;
+  }
+
+  /**
+   * Gets the value of the 'type_map' field.
+   */
+  public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
+    return type_map;
+  }
+
+  /**
+   * Sets the value of the 'type_map' field.
+   * @param value the value to set.
+   */
+  public void setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
+    this.type_map = value;
+  }
+
+  /** Creates a new User RecordBuilder */
+  public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder() {
+    return new org.apache.flink.api.java.record.io.avro.generated.User.Builder();
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing Builder */
+  public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.generated.User.Builder other) {
+    return new org.apache.flink.api.java.record.io.avro.generated.User.Builder(other);
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing User instance */
+  public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.generated.User other) {
+    return new org.apache.flink.api.java.record.io.avro.generated.User.Builder(other);
+  }
+  
+  /**
+   * RecordBuilder for User instances.
+   */
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
+    implements org.apache.avro.data.RecordBuilder<User> {
+
+    private java.lang.CharSequence name;
+    private java.lang.Integer favorite_number;
+    private java.lang.CharSequence favorite_color;
+    private java.lang.Long type_long_test;
+    private java.lang.Object type_double_test;
+    private java.lang.Object type_null_test;
+    private java.lang.Object type_bool_test;
+    private java.util.List<java.lang.CharSequence> type_array_string;
+    private java.util.List<java.lang.Boolean> type_array_boolean;
+    private java.util.List<java.lang.CharSequence> type_nullable_array;
+    private org.apache.flink.api.java.record.io.avro.generated.Colors type_enum;
+    private java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(org.apache.flink.api.java.record.io.avro.generated.User.SCHEMA$);
+    }
+    
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(org.apache.flink.api.java.record.io.avro.generated.User.Builder other) {
+      super(other);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.type_long_test)) {
+        this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
+        fieldSetFlags()[3] = true;
+      }
+      if (isValidValue(fields()[4], other.type_double_test)) {
+        this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
+        fieldSetFlags()[4] = true;
+      }
+      if (isValidValue(fields()[5], other.type_null_test)) {
+        this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
+        fieldSetFlags()[5] = true;
+      }
+      if (isValidValue(fields()[6], other.type_bool_test)) {
+        this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
+        fieldSetFlags()[6] = true;
+      }
+      if (isValidValue(fields()[7], other.type_array_string)) {
+        this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
+        fieldSetFlags()[7] = true;
+      }
+      if (isValidValue(fields()[8], other.type_array_boolean)) {
+        this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
+        fieldSetFlags()[8] = true;
+      }
+      if (isValidValue(fields()[9], other.type_nullable_array)) {
+        this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
+        fieldSetFlags()[9] = true;
+      }
+      if (isValidValue(fields()[10], other.type_enum)) {
+        this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
+        fieldSetFlags()[10] = true;
+      }
+      if (isValidValue(fields()[11], other.type_map)) {
+        this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
+        fieldSetFlags()[11] = true;
+      }
+    }
+    
+    /** Creates a Builder by copying an existing User instance */
+    private Builder(org.apache.flink.api.java.record.io.avro.generated.User other) {
+            super(org.apache.flink.api.java.record.io.avro.generated.User.SCHEMA$);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.type_long_test)) {
+        this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
+        fieldSetFlags()[3] = true;
+      }
+      if (isValidValue(fields()[4], other.type_double_test)) {
+        this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
+        fieldSetFlags()[4] = true;
+      }
+      if (isValidValue(fields()[5], other.type_null_test)) {
+        this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
+        fieldSetFlags()[5] = true;
+      }
+      if (isValidValue(fields()[6], other.type_bool_test)) {
+        this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
+        fieldSetFlags()[6] = true;
+      }
+      if (isValidValue(fields()[7], other.type_array_string)) {
+        this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
+        fieldSetFlags()[7] = true;
+      }
+      if (isValidValue(fields()[8], other.type_array_boolean)) {
+        this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
+        fieldSetFlags()[8] = true;
+      }
+      if (isValidValue(fields()[9], other.type_nullable_array)) {
+        this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
+        fieldSetFlags()[9] = true;
+      }
+      if (isValidValue(fields()[10], other.type_enum)) {
+        this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
+        fieldSetFlags()[10] = true;
+      }
+      if (isValidValue(fields()[11], other.type_map)) {
+        this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
+        fieldSetFlags()[11] = true;
+      }
+    }
+
+    /** Gets the value of the 'name' field */
+    public java.lang.CharSequence getName() {
+      return name;
+    }
+    
+    /** Sets the value of the 'name' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setName(java.lang.CharSequence value) {
+      validate(fields()[0], value);
+      this.name = value;
+      fieldSetFlags()[0] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'name' field has been set */
+    public boolean hasName() {
+      return fieldSetFlags()[0];
+    }
+    
+    /** Clears the value of the 'name' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearName() {
+      name = null;
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_number' field */
+    public java.lang.Integer getFavoriteNumber() {
+      return favorite_number;
+    }
+    
+    /** Sets the value of the 'favorite_number' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setFavoriteNumber(java.lang.Integer value) {
+      validate(fields()[1], value);
+      this.favorite_number = value;
+      fieldSetFlags()[1] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_number' field has been set */
+    public boolean hasFavoriteNumber() {
+      return fieldSetFlags()[1];
+    }
+    
+    /** Clears the value of the 'favorite_number' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearFavoriteNumber() {
+      favorite_number = null;
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_color' field */
+    public java.lang.CharSequence getFavoriteColor() {
+      return favorite_color;
+    }
+    
+    /** Sets the value of the 'favorite_color' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setFavoriteColor(java.lang.CharSequence value) {
+      validate(fields()[2], value);
+      this.favorite_color = value;
+      fieldSetFlags()[2] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_color' field has been set */
+    public boolean hasFavoriteColor() {
+      return fieldSetFlags()[2];
+    }
+    
+    /** Clears the value of the 'favorite_color' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearFavoriteColor() {
+      favorite_color = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_long_test' field */
+    public java.lang.Long getTypeLongTest() {
+      return type_long_test;
+    }
+    
+    /** Sets the value of the 'type_long_test' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeLongTest(java.lang.Long value) {
+      validate(fields()[3], value);
+      this.type_long_test = value;
+      fieldSetFlags()[3] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_long_test' field has been set */
+    public boolean hasTypeLongTest() {
+      return fieldSetFlags()[3];
+    }
+    
+    /** Clears the value of the 'type_long_test' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeLongTest() {
+      type_long_test = null;
+      fieldSetFlags()[3] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_double_test' field */
+    public java.lang.Object getTypeDoubleTest() {
+      return type_double_test;
+    }
+    
+    /** Sets the value of the 'type_double_test' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeDoubleTest(java.lang.Object value) {
+      validate(fields()[4], value);
+      this.type_double_test = value;
+      fieldSetFlags()[4] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_double_test' field has been set */
+    public boolean hasTypeDoubleTest() {
+      return fieldSetFlags()[4];
+    }
+    
+    /** Clears the value of the 'type_double_test' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeDoubleTest() {
+      type_double_test = null;
+      fieldSetFlags()[4] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_null_test' field */
+    public java.lang.Object getTypeNullTest() {
+      return type_null_test;
+    }
+    
+    /** Sets the value of the 'type_null_test' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeNullTest(java.lang.Object value) {
+      validate(fields()[5], value);
+      this.type_null_test = value;
+      fieldSetFlags()[5] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_null_test' field has been set */
+    public boolean hasTypeNullTest() {
+      return fieldSetFlags()[5];
+    }
+    
+    /** Clears the value of the 'type_null_test' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeNullTest() {
+      type_null_test = null;
+      fieldSetFlags()[5] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_bool_test' field */
+    public java.lang.Object getTypeBoolTest() {
+      return type_bool_test;
+    }
+    
+    /** Sets the value of the 'type_bool_test' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeBoolTest(java.lang.Object value) {
+      validate(fields()[6], value);
+      this.type_bool_test = value;
+      fieldSetFlags()[6] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_bool_test' field has been set */
+    public boolean hasTypeBoolTest() {
+      return fieldSetFlags()[6];
+    }
+    
+    /** Clears the value of the 'type_bool_test' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeBoolTest() {
+      type_bool_test = null;
+      fieldSetFlags()[6] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_array_string' field */
+    public java.util.List<java.lang.CharSequence> getTypeArrayString() {
+      return type_array_string;
+    }
+    
+    /** Sets the value of the 'type_array_string' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
+      validate(fields()[7], value);
+      this.type_array_string = value;
+      fieldSetFlags()[7] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_array_string' field has been set */
+    public boolean hasTypeArrayString() {
+      return fieldSetFlags()[7];
+    }
+    
+    /** Clears the value of the 'type_array_string' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeArrayString() {
+      type_array_string = null;
+      fieldSetFlags()[7] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_array_boolean' field */
+    public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
+      return type_array_boolean;
+    }
+    
+    /** Sets the value of the 'type_array_boolean' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
+      validate(fields()[8], value);
+      this.type_array_boolean = value;
+      fieldSetFlags()[8] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_array_boolean' field has been set */
+    public boolean hasTypeArrayBoolean() {
+      return fieldSetFlags()[8];
+    }
+    
+    /** Clears the value of the 'type_array_boolean' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeArrayBoolean() {
+      type_array_boolean = null;
+      fieldSetFlags()[8] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_nullable_array' field */
+    public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
+      return type_nullable_array;
+    }
+    
+    /** Sets the value of the 'type_nullable_array' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
+      validate(fields()[9], value);
+      this.type_nullable_array = value;
+      fieldSetFlags()[9] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_nullable_array' field has been set */
+    public boolean hasTypeNullableArray() {
+      return fieldSetFlags()[9];
+    }
+    
+    /** Clears the value of the 'type_nullable_array' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeNullableArray() {
+      type_nullable_array = null;
+      fieldSetFlags()[9] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_enum' field */
+    public org.apache.flink.api.java.record.io.avro.generated.Colors getTypeEnum() {
+      return type_enum;
+    }
+    
+    /** Sets the value of the 'type_enum' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeEnum(org.apache.flink.api.java.record.io.avro.generated.Colors value) {
+      validate(fields()[10], value);
+      this.type_enum = value;
+      fieldSetFlags()[10] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_enum' field has been set */
+    public boolean hasTypeEnum() {
+      return fieldSetFlags()[10];
+    }
+    
+    /** Clears the value of the 'type_enum' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeEnum() {
+      type_enum = null;
+      fieldSetFlags()[10] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_map' field */
+    public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
+      return type_map;
+    }
+    
+    /** Sets the value of the 'type_map' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
+      validate(fields()[11], value);
+      this.type_map = value;
+      fieldSetFlags()[11] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_map' field has been set */
+    public boolean hasTypeMap() {
+      return fieldSetFlags()[11];
+    }
+    
+    /** Clears the value of the 'type_map' field */
+    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeMap() {
+      type_map = null;
+      fieldSetFlags()[11] = false;
+      return this;
+    }
+
+    @Override
+    public User build() {
+      try {
+        User record = new User();
+        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
+        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
+        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
+        record.type_long_test = fieldSetFlags()[3] ? this.type_long_test : (java.lang.Long) defaultValue(fields()[3]);
+        record.type_double_test = fieldSetFlags()[4] ? this.type_double_test : (java.lang.Object) defaultValue(fields()[4]);
+        record.type_null_test = fieldSetFlags()[5] ? this.type_null_test : (java.lang.Object) defaultValue(fields()[5]);
+        record.type_bool_test = fieldSetFlags()[6] ? this.type_bool_test : (java.lang.Object) defaultValue(fields()[6]);
+        record.type_array_string = fieldSetFlags()[7] ? this.type_array_string : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[7]);
+        record.type_array_boolean = fieldSetFlags()[8] ? this.type_array_boolean : (java.util.List<java.lang.Boolean>) defaultValue(fields()[8]);
+        record.type_nullable_array = fieldSetFlags()[9] ? this.type_nullable_array : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[9]);
+        record.type_enum = fieldSetFlags()[10] ? this.type_enum : (org.apache.flink.api.java.record.io.avro.generated.Colors) defaultValue(fields()[10]);
+        record.type_map = fieldSetFlags()[11] ? this.type_map : (java.util.Map<java.lang.CharSequence,java.lang.Long>) defaultValue(fields()[11]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/resources/avro/user.avsc b/flink-addons/flink-avro/src/test/resources/avro/user.avsc
new file mode 100644
index 0000000..af3cb75
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/resources/avro/user.avsc
@@ -0,0 +1,19 @@
+
+{"namespace": "org.apache.flink.api.java.record.io.avro.generated",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "type_long_test", "type": ["long", "null"]},
+     {"name": "type_double_test", "type": ["double"]},
+     {"name": "type_null_test", "type": ["null"]},
+     {"name": "type_bool_test", "type": ["boolean"]},
+     {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}},  
+     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, 
+     {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null},
+     {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}},
+     {"name": "type_map", "type": {"type": "map", "values": "long"}} 
+ ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/resources/testdata.avro
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/resources/testdata.avro b/flink-addons/flink-avro/src/test/resources/testdata.avro
new file mode 100644
index 0000000..45308b9
Binary files /dev/null and b/flink-addons/flink-avro/src/test/resources/testdata.avro differ

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/pom.xml b/flink-addons/flink-hadoop-compatibility/pom.xml
new file mode 100644
index 0000000..8ad1925
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+	
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	
+	<modelVersion>4.0.0</modelVersion>
+	
+	<parent>
+		<artifactId>flink-addons</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>0.6-incubating-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-hadoop-compatibility</artifactId>
+	<name>flink-hadoop-compatibility</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<profiles>
+		<profile>
+			<id>hadoop-2</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop2--><name>hadoop.profile</name><value>2</value>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+					<version>${hadoop.version}</version>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
new file mode 100644
index 0000000..030d7f2
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
+import org.apache.flink.types.TypeInformation;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private static final Log LOG = LogFactory.getLog(HadoopInputFormat.class);
+	
+	private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
+	private Class<K> keyClass;
+	private Class<V> valueClass;
+	private JobConf jobConf;
+	
+	private transient K key;
+	private transient V value;
+	
+	private transient RecordReader<K, V> recordReader;
+	private transient boolean fetched = false;
+	private transient boolean hasNext;
+	
+	public HadoopInputFormat() {
+		super();
+	}
+	
+	public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+		super();
+		this.mapredInputFormat = mapredInputFormat;
+		this.keyClass = key;
+		this.valueClass = value;
+		HadoopUtils.mergeHadoopConf(job);
+		this.jobConf = job;
+	}
+	
+	public void setJobConf(JobConf job) {
+		this.jobConf = job;
+	}
+	
+	public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() {
+		return mapredInputFormat;
+	}
+	
+	public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat) {
+		this.mapredInputFormat = mapredInputFormat;
+	}
+	
+	public JobConf getJobConf() {
+		return jobConf;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  InputFormat
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void configure(Configuration parameters) {
+		// nothing to do
+	}
+	
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
+		// only gather base statistics for FileInputFormats
+		if(!(mapredInputFormat instanceof FileInputFormat)) {
+			return null;
+		}
+		
+		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
+				(FileBaseStatistics) cachedStats : null;
+		
+		try {
+			final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf);
+			
+			return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
+		} catch (IOException ioex) {
+			if (LOG.isWarnEnabled()) {
+				LOG.warn("Could not determine statistics due to an io error: "
+						+ ioex.getMessage());
+			}
+		} catch (Throwable t) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Unexpected problem while getting the file statistics: "
+						+ t.getMessage(), t);
+			}
+		}
+		
+		// no statistics available
+		return null;
+	}
+	
+	@Override
+	public HadoopInputSplit[] createInputSplits(int minNumSplits)
+			throws IOException {
+		org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits);
+		HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length];
+		for(int i=0;i<splitArray.length;i++){
+			hiSplit[i] = new HadoopInputSplit(splitArray[i], jobConf);
+		}
+		return hiSplit;
+	}
+	
+	@Override
+	public Class<? extends HadoopInputSplit> getInputSplitType() {
+		return HadoopInputSplit.class;
+	}
+	
+	@Override
+	public void open(HadoopInputSplit split) throws IOException {
+		this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
+		key = this.recordReader.createKey();
+		value = this.recordReader.createValue();
+		this.fetched = false;
+	}
+	
+	@Override
+	public boolean reachedEnd() throws IOException {
+		if(!fetched) {
+			fetchNext();
+		}
+		return !hasNext;
+	}
+	
+	private void fetchNext() throws IOException {
+		hasNext = this.recordReader.next(key, value);
+		fetched = true;
+	}
+	
+	@Override
+	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
+		if(!fetched) {
+			fetchNext();
+		}
+		if(!hasNext) {
+			return null;
+		}
+		record.f0 = key;
+		record.f1 = value;
+		fetched = false;
+		return record;
+	}
+	
+	@Override
+	public void close() throws IOException {
+		this.recordReader.close();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Helper methods
+	// --------------------------------------------------------------------------------------------
+	
+	private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
+			ArrayList<FileStatus> files) throws IOException {
+		
+		long latestModTime = 0L;
+		
+		// get the file info and check whether the cached statistics are still valid.
+		for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
+			
+			final Path filePath = new Path(hadoopPath.toUri());
+			final FileSystem fs = FileSystem.get(filePath.toUri());
+			
+			final FileStatus file = fs.getFileStatus(filePath);
+			latestModTime = Math.max(latestModTime, file.getModificationTime());
+			
+			// enumerate all files and check their modification time stamp.
+			if (file.isDir()) {
+				FileStatus[] fss = fs.listStatus(filePath);
+				files.ensureCapacity(files.size() + fss.length);
+				
+				for (FileStatus s : fss) {
+					if (!s.isDir()) {
+						files.add(s);
+						latestModTime = Math.max(s.getModificationTime(), latestModTime);
+					}
+				}
+			} else {
+				files.add(file);
+			}
+		}
+		
+		// check whether the cached statistics are still valid, if we have any
+		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
+			return cachedStats;
+		}
+		
+		// calculate the whole length
+		long len = 0;
+		for (FileStatus s : files) {
+			len += s.getLen();
+		}
+		
+		// sanity check
+		if (len <= 0) {
+			len = BaseStatistics.SIZE_UNKNOWN;
+		}
+		
+		return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom serialization methods
+	// --------------------------------------------------------------------------------------------
+	
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(mapredInputFormat.getClass().getName());
+		out.writeUTF(keyClass.getName());
+		out.writeUTF(valueClass.getName());
+		jobConf.write(out);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		String hadoopInputFormatClassName = in.readUTF();
+		String keyClassName = in.readUTF();
+		String valueClassName = in.readUTF();
+		if(jobConf == null) {
+			jobConf = new JobConf();
+		}
+		jobConf.readFields(in);
+		try {
+			this.mapredInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
+		}
+		try {
+			this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to find key class.", e);
+		}
+		try {
+			this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to find value class.", e);
+		}
+		ReflectionUtils.setConf(mapredInputFormat, jobConf);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  ResultTypeQueryable
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public TypeInformation<Tuple2<K,V>> getProducedType() {
+		return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass));
+	}
+}


Mime
View raw message