flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [25/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:42:03 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
new file mode 100644
index 0000000..1174786
--- /dev/null
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.avro.testjar;
+
+// ================================================================================================
+//  This file defines the classes for the AvroExternalJarProgramITCase.
+//  The program is exported into src/test/resources/AvroTestProgram.jar.
+//
+//  THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED
+//  AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL
+//  NOT BE COVERED BY THIS TEST.
+// ================================================================================================
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.core.fs.Path;
+
+public class AvroExternalJarProgram  {
+
+	public static final class Color {
+		
+		private String name;
+		private double saturation;
+		
+		public Color() {
+			name = "";
+			saturation = 1.0;
+		}
+		
+		public Color(String name, double saturation) {
+			this.name = name;
+			this.saturation = saturation;
+		}
+		
+		public String getName() {
+			return name;
+		}
+		
+		public void setName(String name) {
+			this.name = name;
+		}
+		
+		public double getSaturation() {
+			return saturation;
+		}
+		
+		public void setSaturation(double saturation) {
+			this.saturation = saturation;
+		}
+		
+		@Override
+		public String toString() {
+			return name + '(' + saturation + ')';
+		}
+	}
+	
+	public static final class MyUser {
+		
+		private String name;
+		private List<Color> colors;
+		
+		public MyUser() {
+			name = "unknown";
+			colors = new ArrayList<Color>();
+		}
+		
+		public MyUser(String name, List<Color> colors) {
+			this.name = name;
+			this.colors = colors;
+		}
+		
+		public String getName() {
+			return name;
+		}
+		
+		public List<Color> getColors() {
+			return colors;
+		}
+		
+		public void setName(String name) {
+			this.name = name;
+		}
+		
+		public void setColors(List<Color> colors) {
+			this.colors = colors;
+		}
+		
+		@Override
+		public String toString() {
+			return name + " : " + colors;
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, MyUser> map(MyUser u) {
+			String namePrefix = u.getName().substring(0, 1);
+			return new Tuple2<String, MyUser>(namePrefix, u);
+		}
+	}
+	
+	public static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) {
+			return val1;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Test Data
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class Generator {
+		
+		private final Random rnd = new Random(2389756789345689276L);
+		
+		public MyUser nextUser() {
+			return randomUser();
+		}
+		
+		private MyUser randomUser() {
+			
+			int numColors = rnd.nextInt(5);
+			ArrayList<Color> colors = new ArrayList<Color>(numColors);
+			for (int i = 0; i < numColors; i++) {
+				colors.add(new Color(randomString(), rnd.nextDouble()));
+			}
+			
+			return new MyUser(randomString(), colors);
+		}
+		
+		private String randomString() {
+			char[] c = new char[this.rnd.nextInt(20) + 5];
+			
+			for (int i = 0; i < c.length; i++) {
+				c[i] = (char) (this.rnd.nextInt(150) + 40);
+			}
+			
+			return new String(c);
+		}
+	}
+	
+	public static void writeTestData(File testFile, int numRecords) throws IOException {
+		
+		DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
+		DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
+		
+		dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
+		
+		
+		Generator generator = new Generator();
+		
+		for (int i = 0; i < numRecords; i++) {
+			MyUser user = generator.nextUser();
+			dataFileWriter.append(user);
+		}
+		
+		dataFileWriter.close();
+	}
+
+//	public static void main(String[] args) throws Exception {
+//		String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
+//		writeTestData(new File(testDataFile), 50);
+//	}
+	
+	public static void main(String[] args) throws Exception {
+		String inputPath = args[0];
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
+	
+		DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
+		
+		result.output(new DiscardingOutputFormat<Tuple2<String,MyUser>>());
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
new file mode 100644
index 0000000..d8d8b46
--- /dev/null
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the avro input format.
+ * (The testcase is mostly the getting started tutorial of avro)
+ * http://avro.apache.org/docs/current/gettingstartedjava.html
+ */
+public class AvroRecordInputFormatTest {
+	
+	private File testFile;
+	
+	final static String TEST_NAME = "Alyssa";
+	
+	final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+	final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+	
+	final static boolean TEST_ARRAY_BOOLEAN_1 = true;
+	final static boolean TEST_ARRAY_BOOLEAN_2 = false;
+	
+	final static Colors TEST_ENUM_COLOR = Colors.GREEN;
+	
+	final static String TEST_MAP_KEY1 = "KEY 1";
+	final static long TEST_MAP_VALUE1 = 8546456L;
+	final static String TEST_MAP_KEY2 = "KEY 2";
+	final static long TEST_MAP_VALUE2 = 17554L;
+
+	@Before
+	public void createFiles() throws IOException {
+		testFile = File.createTempFile("AvroInputFormatTest", null);
+		
+		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+		stringArray.add(TEST_ARRAY_STRING_1);
+		stringArray.add(TEST_ARRAY_STRING_2);
+		
+		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
+		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
+		
+		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
+		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
+		
+		
+		User user1 = new User();
+		user1.setName(TEST_NAME);
+		user1.setFavoriteNumber(256);
+		user1.setTypeDoubleTest(123.45d);
+		user1.setTypeBoolTest(true);
+		user1.setTypeArrayString(stringArray);
+		user1.setTypeArrayBoolean(booleanArray);
+		user1.setTypeEnum(TEST_ENUM_COLOR);
+		user1.setTypeMap(longMap);
+		
+		// Construct via builder
+		User user2 = User.newBuilder()
+		             .setName("Charlie")
+		             .setFavoriteColor("blue")
+		             .setFavoriteNumber(null)
+		             .setTypeBoolTest(false)
+		             .setTypeDoubleTest(1.337d)
+		             .setTypeNullTest(null)
+		             .setTypeLongTest(1337L)
+		             .setTypeArrayString(new ArrayList<CharSequence>())
+		             .setTypeArrayBoolean(new ArrayList<Boolean>())
+		             .setTypeNullableArray(null)
+		             .setTypeEnum(Colors.RED)
+		             .setTypeMap(new HashMap<CharSequence, Long>())
+		             .build();
+		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
+		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+		dataFileWriter.create(user1.getSchema(), testFile);
+		dataFileWriter.append(user1);
+		dataFileWriter.append(user2);
+		dataFileWriter.close();
+	}
+	
+	@Test
+	public void testDeserialisation() throws IOException {
+		Configuration parameters = new Configuration();
+		
+		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		
+		format.configure(parameters);
+		FileInputSplit[] splits = format.createInputSplits(1);
+		assertEquals(splits.length, 1);
+		format.open(splits[0]);
+		
+		User u = format.nextRecord(null);
+		assertNotNull(u);
+		
+		String name = u.getName().toString();
+		assertNotNull("empty record", name);
+		assertEquals("name not equal", TEST_NAME, name);
+		
+		// check arrays
+		List<CharSequence> sl = u.getTypeArrayString();
+		assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
+		assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
+		
+		List<Boolean> bl = u.getTypeArrayBoolean();
+		assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
+		assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+		
+		// check enums
+		Colors enumValue = u.getTypeEnum();
+		assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+		
+		// check maps
+		Map<CharSequence, Long> lm = u.getTypeMap();
+		assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+		assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+		
+		assertFalse("expecting second element", format.reachedEnd());
+		assertNotNull("expecting second element", format.nextRecord(u));
+		
+		assertNull(format.nextRecord(u));
+		assertTrue(format.reachedEnd());
+		
+		format.close();
+	}
+	
+	@After
+	public void deleteFiles() {
+		testFile.delete();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
new file mode 100644
index 0000000..58e1f5c
--- /dev/null
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ * 
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.generated;  
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public enum Colors { 
+  RED, GREEN, BLUE  ;
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
new file mode 100644
index 0000000..505857e
--- /dev/null
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ * 
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.generated;  
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]},{\"name\":\"type_double_test\",\"type\":[\"double\"]},{\"name\":\"type_null_test\",\"type\":[\"null\"]},{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\",\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\",\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type
 _map\",\"type\":{\"type\":\"map\",\"values\":\"long\"}}]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+  @Deprecated public java.lang.CharSequence name;
+  @Deprecated public java.lang.Integer favorite_number;
+  @Deprecated public java.lang.CharSequence favorite_color;
+  @Deprecated public java.lang.Long type_long_test;
+  @Deprecated public java.lang.Object type_double_test;
+  @Deprecated public java.lang.Object type_null_test;
+  @Deprecated public java.lang.Object type_bool_test;
+  @Deprecated public java.util.List<java.lang.CharSequence> type_array_string;
+  @Deprecated public java.util.List<java.lang.Boolean> type_array_boolean;
+  @Deprecated public java.util.List<java.lang.CharSequence> type_nullable_array;
+  @Deprecated public org.apache.flink.api.io.avro.generated.Colors type_enum;
+  @Deprecated public java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use {@link \#newBuilder()}. 
+   */
+  public User() {}
+
+  /**
+   * All-args constructor.
+   */
+  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color, java.lang.Long type_long_test, java.lang.Object type_double_test, java.lang.Object type_null_test, java.lang.Object type_bool_test, java.util.List<java.lang.CharSequence> type_array_string, java.util.List<java.lang.Boolean> type_array_boolean, java.util.List<java.lang.CharSequence> type_nullable_array, org.apache.flink.api.io.avro.generated.Colors type_enum, java.util.Map<java.lang.CharSequence,java.lang.Long> type_map) {
+    this.name = name;
+    this.favorite_number = favorite_number;
+    this.favorite_color = favorite_color;
+    this.type_long_test = type_long_test;
+    this.type_double_test = type_double_test;
+    this.type_null_test = type_null_test;
+    this.type_bool_test = type_bool_test;
+    this.type_array_string = type_array_string;
+    this.type_array_boolean = type_array_boolean;
+    this.type_nullable_array = type_nullable_array;
+    this.type_enum = type_enum;
+    this.type_map = type_map;
+  }
+
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return name;
+    case 1: return favorite_number;
+    case 2: return favorite_color;
+    case 3: return type_long_test;
+    case 4: return type_double_test;
+    case 5: return type_null_test;
+    case 6: return type_bool_test;
+    case 7: return type_array_string;
+    case 8: return type_array_boolean;
+    case 9: return type_nullable_array;
+    case 10: return type_enum;
+    case 11: return type_map;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call. 
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: name = (java.lang.CharSequence)value$; break;
+    case 1: favorite_number = (java.lang.Integer)value$; break;
+    case 2: favorite_color = (java.lang.CharSequence)value$; break;
+    case 3: type_long_test = (java.lang.Long)value$; break;
+    case 4: type_double_test = (java.lang.Object)value$; break;
+    case 5: type_null_test = (java.lang.Object)value$; break;
+    case 6: type_bool_test = (java.lang.Object)value$; break;
+    case 7: type_array_string = (java.util.List<java.lang.CharSequence>)value$; break;
+    case 8: type_array_boolean = (java.util.List<java.lang.Boolean>)value$; break;
+    case 9: type_nullable_array = (java.util.List<java.lang.CharSequence>)value$; break;
+    case 10: type_enum = (org.apache.flink.api.io.avro.generated.Colors)value$; break;
+    case 11: type_map = (java.util.Map<java.lang.CharSequence,java.lang.Long>)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'name' field.
+   */
+  public java.lang.CharSequence getName() {
+    return name;
+  }
+
+  /**
+   * Sets the value of the 'name' field.
+   * @param value the value to set.
+   */
+  public void setName(java.lang.CharSequence value) {
+    this.name = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_number' field.
+   */
+  public java.lang.Integer getFavoriteNumber() {
+    return favorite_number;
+  }
+
+  /**
+   * Sets the value of the 'favorite_number' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteNumber(java.lang.Integer value) {
+    this.favorite_number = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_color' field.
+   */
+  public java.lang.CharSequence getFavoriteColor() {
+    return favorite_color;
+  }
+
+  /**
+   * Sets the value of the 'favorite_color' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteColor(java.lang.CharSequence value) {
+    this.favorite_color = value;
+  }
+
+  /**
+   * Gets the value of the 'type_long_test' field.
+   */
+  public java.lang.Long getTypeLongTest() {
+    return type_long_test;
+  }
+
+  /**
+   * Sets the value of the 'type_long_test' field.
+   * @param value the value to set.
+   */
+  public void setTypeLongTest(java.lang.Long value) {
+    this.type_long_test = value;
+  }
+
+  /**
+   * Gets the value of the 'type_double_test' field.
+   */
+  public java.lang.Object getTypeDoubleTest() {
+    return type_double_test;
+  }
+
+  /**
+   * Sets the value of the 'type_double_test' field.
+   * @param value the value to set.
+   */
+  public void setTypeDoubleTest(java.lang.Object value) {
+    this.type_double_test = value;
+  }
+
+  /**
+   * Gets the value of the 'type_null_test' field.
+   */
+  public java.lang.Object getTypeNullTest() {
+    return type_null_test;
+  }
+
+  /**
+   * Sets the value of the 'type_null_test' field.
+   * @param value the value to set.
+   */
+  public void setTypeNullTest(java.lang.Object value) {
+    this.type_null_test = value;
+  }
+
+  /**
+   * Gets the value of the 'type_bool_test' field.
+   */
+  public java.lang.Object getTypeBoolTest() {
+    return type_bool_test;
+  }
+
+  /**
+   * Sets the value of the 'type_bool_test' field.
+   * @param value the value to set.
+   */
+  public void setTypeBoolTest(java.lang.Object value) {
+    this.type_bool_test = value;
+  }
+
+  /**
+   * Gets the value of the 'type_array_string' field.
+   */
+  public java.util.List<java.lang.CharSequence> getTypeArrayString() {
+    return type_array_string;
+  }
+
+  /**
+   * Sets the value of the 'type_array_string' field.
+   * @param value the value to set.
+   */
+  public void setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
+    this.type_array_string = value;
+  }
+
+  /**
+   * Gets the value of the 'type_array_boolean' field.
+   */
+  public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
+    return type_array_boolean;
+  }
+
+  /**
+   * Sets the value of the 'type_array_boolean' field.
+   * @param value the value to set.
+   */
+  public void setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
+    this.type_array_boolean = value;
+  }
+
+  /**
+   * Gets the value of the 'type_nullable_array' field.
+   */
+  public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
+    return type_nullable_array;
+  }
+
+  /**
+   * Sets the value of the 'type_nullable_array' field.
+   * @param value the value to set.
+   */
+  public void setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
+    this.type_nullable_array = value;
+  }
+
+  /**
+   * Gets the value of the 'type_enum' field.
+   */
+  public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() {
+    return type_enum;
+  }
+
+  /**
+   * Sets the value of the 'type_enum' field.
+   * @param value the value to set.
+   */
+  public void setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) {
+    this.type_enum = value;
+  }
+
+  /**
+   * Gets the value of the 'type_map' field.
+   */
+  public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
+    return type_map;
+  }
+
+  /**
+   * Sets the value of the 'type_map' field.
+   * @param value the value to set.
+   */
+  public void setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
+    this.type_map = value;
+  }
+
+  /** Creates a new User RecordBuilder */
+  public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder() {
+    return new org.apache.flink.api.io.avro.generated.User.Builder();
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing Builder */
+  public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.io.avro.generated.User.Builder other) {
+    return new org.apache.flink.api.io.avro.generated.User.Builder(other);
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing User instance */
+  public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.io.avro.generated.User other) {
+    return new org.apache.flink.api.io.avro.generated.User.Builder(other);
+  }
+  
+  /**
+   * RecordBuilder for User instances.
+   */
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
+    implements org.apache.avro.data.RecordBuilder<User> {
+
+    private java.lang.CharSequence name;
+    private java.lang.Integer favorite_number;
+    private java.lang.CharSequence favorite_color;
+    private java.lang.Long type_long_test;
+    private java.lang.Object type_double_test;
+    private java.lang.Object type_null_test;
+    private java.lang.Object type_bool_test;
+    private java.util.List<java.lang.CharSequence> type_array_string;
+    private java.util.List<java.lang.Boolean> type_array_boolean;
+    private java.util.List<java.lang.CharSequence> type_nullable_array;
+    private org.apache.flink.api.io.avro.generated.Colors type_enum;
+    private java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(org.apache.flink.api.io.avro.generated.User.SCHEMA$);
+    }
+    
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(org.apache.flink.api.io.avro.generated.User.Builder other) {
+      super(other);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.type_long_test)) {
+        this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
+        fieldSetFlags()[3] = true;
+      }
+      if (isValidValue(fields()[4], other.type_double_test)) {
+        this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
+        fieldSetFlags()[4] = true;
+      }
+      if (isValidValue(fields()[5], other.type_null_test)) {
+        this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
+        fieldSetFlags()[5] = true;
+      }
+      if (isValidValue(fields()[6], other.type_bool_test)) {
+        this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
+        fieldSetFlags()[6] = true;
+      }
+      if (isValidValue(fields()[7], other.type_array_string)) {
+        this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
+        fieldSetFlags()[7] = true;
+      }
+      if (isValidValue(fields()[8], other.type_array_boolean)) {
+        this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
+        fieldSetFlags()[8] = true;
+      }
+      if (isValidValue(fields()[9], other.type_nullable_array)) {
+        this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
+        fieldSetFlags()[9] = true;
+      }
+      if (isValidValue(fields()[10], other.type_enum)) {
+        this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
+        fieldSetFlags()[10] = true;
+      }
+      if (isValidValue(fields()[11], other.type_map)) {
+        this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
+        fieldSetFlags()[11] = true;
+      }
+    }
+    
+    /** Creates a Builder by copying an existing User instance */
+    private Builder(org.apache.flink.api.io.avro.generated.User other) {
+            super(org.apache.flink.api.io.avro.generated.User.SCHEMA$);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.type_long_test)) {
+        this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
+        fieldSetFlags()[3] = true;
+      }
+      if (isValidValue(fields()[4], other.type_double_test)) {
+        this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
+        fieldSetFlags()[4] = true;
+      }
+      if (isValidValue(fields()[5], other.type_null_test)) {
+        this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
+        fieldSetFlags()[5] = true;
+      }
+      if (isValidValue(fields()[6], other.type_bool_test)) {
+        this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
+        fieldSetFlags()[6] = true;
+      }
+      if (isValidValue(fields()[7], other.type_array_string)) {
+        this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
+        fieldSetFlags()[7] = true;
+      }
+      if (isValidValue(fields()[8], other.type_array_boolean)) {
+        this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
+        fieldSetFlags()[8] = true;
+      }
+      if (isValidValue(fields()[9], other.type_nullable_array)) {
+        this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
+        fieldSetFlags()[9] = true;
+      }
+      if (isValidValue(fields()[10], other.type_enum)) {
+        this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
+        fieldSetFlags()[10] = true;
+      }
+      if (isValidValue(fields()[11], other.type_map)) {
+        this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
+        fieldSetFlags()[11] = true;
+      }
+    }
+
+    /** Gets the value of the 'name' field */
+    public java.lang.CharSequence getName() {
+      return name;
+    }
+    
+    /** Sets the value of the 'name' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setName(java.lang.CharSequence value) {
+      validate(fields()[0], value);
+      this.name = value;
+      fieldSetFlags()[0] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'name' field has been set */
+    public boolean hasName() {
+      return fieldSetFlags()[0];
+    }
+    
+    /** Clears the value of the 'name' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearName() {
+      name = null;
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_number' field */
+    public java.lang.Integer getFavoriteNumber() {
+      return favorite_number;
+    }
+    
+    /** Sets the value of the 'favorite_number' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setFavoriteNumber(java.lang.Integer value) {
+      validate(fields()[1], value);
+      this.favorite_number = value;
+      fieldSetFlags()[1] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_number' field has been set */
+    public boolean hasFavoriteNumber() {
+      return fieldSetFlags()[1];
+    }
+    
+    /** Clears the value of the 'favorite_number' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearFavoriteNumber() {
+      favorite_number = null;
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_color' field */
+    public java.lang.CharSequence getFavoriteColor() {
+      return favorite_color;
+    }
+    
+    /** Sets the value of the 'favorite_color' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setFavoriteColor(java.lang.CharSequence value) {
+      validate(fields()[2], value);
+      this.favorite_color = value;
+      fieldSetFlags()[2] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_color' field has been set */
+    public boolean hasFavoriteColor() {
+      return fieldSetFlags()[2];
+    }
+    
+    /** Clears the value of the 'favorite_color' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearFavoriteColor() {
+      favorite_color = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_long_test' field */
+    public java.lang.Long getTypeLongTest() {
+      return type_long_test;
+    }
+    
+    /** Sets the value of the 'type_long_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeLongTest(java.lang.Long value) {
+      validate(fields()[3], value);
+      this.type_long_test = value;
+      fieldSetFlags()[3] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_long_test' field has been set */
+    public boolean hasTypeLongTest() {
+      return fieldSetFlags()[3];
+    }
+    
+    /** Clears the value of the 'type_long_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeLongTest() {
+      type_long_test = null;
+      fieldSetFlags()[3] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_double_test' field */
+    public java.lang.Object getTypeDoubleTest() {
+      return type_double_test;
+    }
+    
+    /** Sets the value of the 'type_double_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeDoubleTest(java.lang.Object value) {
+      validate(fields()[4], value);
+      this.type_double_test = value;
+      fieldSetFlags()[4] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_double_test' field has been set */
+    public boolean hasTypeDoubleTest() {
+      return fieldSetFlags()[4];
+    }
+    
+    /** Clears the value of the 'type_double_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeDoubleTest() {
+      type_double_test = null;
+      fieldSetFlags()[4] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_null_test' field */
+    public java.lang.Object getTypeNullTest() {
+      return type_null_test;
+    }
+    
+    /** Sets the value of the 'type_null_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeNullTest(java.lang.Object value) {
+      validate(fields()[5], value);
+      this.type_null_test = value;
+      fieldSetFlags()[5] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_null_test' field has been set */
+    public boolean hasTypeNullTest() {
+      return fieldSetFlags()[5];
+    }
+    
+    /** Clears the value of the 'type_null_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeNullTest() {
+      type_null_test = null;
+      fieldSetFlags()[5] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_bool_test' field */
+    public java.lang.Object getTypeBoolTest() {
+      return type_bool_test;
+    }
+    
+    /** Sets the value of the 'type_bool_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeBoolTest(java.lang.Object value) {
+      validate(fields()[6], value);
+      this.type_bool_test = value;
+      fieldSetFlags()[6] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_bool_test' field has been set */
+    public boolean hasTypeBoolTest() {
+      return fieldSetFlags()[6];
+    }
+    
+    /** Clears the value of the 'type_bool_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeBoolTest() {
+      type_bool_test = null;
+      fieldSetFlags()[6] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_array_string' field */
+    public java.util.List<java.lang.CharSequence> getTypeArrayString() {
+      return type_array_string;
+    }
+    
+    /** Sets the value of the 'type_array_string' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
+      validate(fields()[7], value);
+      this.type_array_string = value;
+      fieldSetFlags()[7] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_array_string' field has been set */
+    public boolean hasTypeArrayString() {
+      return fieldSetFlags()[7];
+    }
+    
+    /** Clears the value of the 'type_array_string' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeArrayString() {
+      type_array_string = null;
+      fieldSetFlags()[7] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_array_boolean' field */
+    public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
+      return type_array_boolean;
+    }
+    
+    /** Sets the value of the 'type_array_boolean' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
+      validate(fields()[8], value);
+      this.type_array_boolean = value;
+      fieldSetFlags()[8] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_array_boolean' field has been set */
+    public boolean hasTypeArrayBoolean() {
+      return fieldSetFlags()[8];
+    }
+    
+    /** Clears the value of the 'type_array_boolean' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeArrayBoolean() {
+      type_array_boolean = null;
+      fieldSetFlags()[8] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_nullable_array' field */
+    public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
+      return type_nullable_array;
+    }
+    
+    /** Sets the value of the 'type_nullable_array' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
+      validate(fields()[9], value);
+      this.type_nullable_array = value;
+      fieldSetFlags()[9] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_nullable_array' field has been set */
+    public boolean hasTypeNullableArray() {
+      return fieldSetFlags()[9];
+    }
+    
+    /** Clears the value of the 'type_nullable_array' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeNullableArray() {
+      type_nullable_array = null;
+      fieldSetFlags()[9] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_enum' field */
+    public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() {
+      return type_enum;
+    }
+    
+    /** Sets the value of the 'type_enum' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) {
+      validate(fields()[10], value);
+      this.type_enum = value;
+      fieldSetFlags()[10] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_enum' field has been set */
+    public boolean hasTypeEnum() {
+      return fieldSetFlags()[10];
+    }
+    
+    /** Clears the value of the 'type_enum' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeEnum() {
+      type_enum = null;
+      fieldSetFlags()[10] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_map' field */
+    public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
+      return type_map;
+    }
+    
+    /** Sets the value of the 'type_map' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
+      validate(fields()[11], value);
+      this.type_map = value;
+      fieldSetFlags()[11] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_map' field has been set */
+    public boolean hasTypeMap() {
+      return fieldSetFlags()[11];
+    }
+    
+    /** Clears the value of the 'type_map' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeMap() {
+      type_map = null;
+      fieldSetFlags()[11] = false;
+      return this;
+    }
+
+    @Override
+    public User build() {
+      try {
+        User record = new User();
+        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
+        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
+        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
+        record.type_long_test = fieldSetFlags()[3] ? this.type_long_test : (java.lang.Long) defaultValue(fields()[3]);
+        record.type_double_test = fieldSetFlags()[4] ? this.type_double_test : (java.lang.Object) defaultValue(fields()[4]);
+        record.type_null_test = fieldSetFlags()[5] ? this.type_null_test : (java.lang.Object) defaultValue(fields()[5]);
+        record.type_bool_test = fieldSetFlags()[6] ? this.type_bool_test : (java.lang.Object) defaultValue(fields()[6]);
+        record.type_array_string = fieldSetFlags()[7] ? this.type_array_string : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[7]);
+        record.type_array_boolean = fieldSetFlags()[8] ? this.type_array_boolean : (java.util.List<java.lang.Boolean>) defaultValue(fields()[8]);
+        record.type_nullable_array = fieldSetFlags()[9] ? this.type_nullable_array : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[9]);
+        record.type_enum = fieldSetFlags()[10] ? this.type_enum : (org.apache.flink.api.io.avro.generated.Colors) defaultValue(fields()[10]);
+        record.type_map = fieldSetFlags()[11] ? this.type_map : (java.util.Map<java.lang.CharSequence,java.lang.Long>) defaultValue(fields()[11]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
new file mode 100644
index 0000000..23fbab3
--- /dev/null
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AvroInputFormatTypeExtractionTest {
+
+	@Test
+	public void testTypeExtraction() {
+		try {
+			InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class);
+
+			TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format);
+
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			DataSet<MyAvroType> input = env.createInput(format);
+			TypeInformation<?> typeInfoDataSet = input.getType();
+
+
+			Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
+			Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);
+
+			Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass());
+			Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	public static final class MyAvroType {
+
+		public String theString;
+
+		private double aDouble;
+
+		public double getaDouble() {
+			return aDouble;
+		}
+
+		public void setaDouble(double aDouble) {
+			this.aDouble = aDouble;
+		}
+
+		public void setTheString(String theString) {
+			this.theString = theString;
+		}
+
+		public String getTheString() {
+			return theString;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/resources/avro/user.avsc b/flink-staging/flink-avro/src/test/resources/avro/user.avsc
new file mode 100644
index 0000000..af3cb75
--- /dev/null
+++ b/flink-staging/flink-avro/src/test/resources/avro/user.avsc
@@ -0,0 +1,19 @@
+
+{"namespace": "org.apache.flink.api.java.record.io.avro.generated",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "type_long_test", "type": ["long", "null"]},
+     {"name": "type_double_test", "type": ["double"]},
+     {"name": "type_null_test", "type": ["null"]},
+     {"name": "type_bool_test", "type": ["boolean"]},
+     {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}},  
+     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, 
+     {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null},
+     {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}},
+     {"name": "type_map", "type": {"type": "map", "values": "long"}} 
+ ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-avro/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/resources/log4j-test.properties b/flink-staging/flink-avro/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..0b686e5
--- /dev/null
+++ b/flink-staging/flink-avro/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-avro/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/resources/logback-test.xml b/flink-staging/flink-avro/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-staging/flink-avro/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-avro/src/test/resources/testdata.avro
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/resources/testdata.avro b/flink-staging/flink-avro/src/test/resources/testdata.avro
new file mode 100644
index 0000000..45308b9
Binary files /dev/null and b/flink-staging/flink-avro/src/test/resources/testdata.avro differ

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/pom.xml b/flink-staging/flink-hadoop-compatibility/pom.xml
new file mode 100644
index 0000000..e39f8b6
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	
+	<modelVersion>4.0.0</modelVersion>
+	
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-staging</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-hadoop-compatibility</artifactId>
+	<name>flink-hadoop-compatibility</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<profiles>
+		<profile>
+			<id>hadoop-2</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop2--><name>!hadoop.profile</name>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
new file mode 100644
index 0000000..326a1c4
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class);
+	
+	private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
+	private Class<K> keyClass;
+	private Class<V> valueClass;
+	private JobConf jobConf;
+	
+	private transient K key;
+	private transient V value;
+	
+	private transient RecordReader<K, V> recordReader;
+	private transient boolean fetched = false;
+	private transient boolean hasNext;
+
+	public HadoopInputFormat() {
+		super();
+	}
+	
+	public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+		super();
+		this.mapredInputFormat = mapredInputFormat;
+		this.keyClass = key;
+		this.valueClass = value;
+		HadoopUtils.mergeHadoopConf(job);
+		this.jobConf = job;
+		ReflectionUtils.setConf(mapredInputFormat, jobConf);
+	}
+	
+	public void setJobConf(JobConf job) {
+		this.jobConf = job;
+	}
+	
+	public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() {
+		return mapredInputFormat;
+	}
+	
+	public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat) {
+		this.mapredInputFormat = mapredInputFormat;
+	}
+	
+	public JobConf getJobConf() {
+		return jobConf;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  InputFormat
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void configure(Configuration parameters) {
+		// nothing to do
+	}
+	
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
+		// only gather base statistics for FileInputFormats
+		if(!(mapredInputFormat instanceof FileInputFormat)) {
+			return null;
+		}
+		
+		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
+				(FileBaseStatistics) cachedStats : null;
+		
+		try {
+			final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf);
+			
+			return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
+		} catch (IOException ioex) {
+			if (LOG.isWarnEnabled()) {
+				LOG.warn("Could not determine statistics due to an io error: "
+						+ ioex.getMessage());
+			}
+		} catch (Throwable t) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Unexpected problem while getting the file statistics: "
+						+ t.getMessage(), t);
+			}
+		}
+		
+		// no statistics available
+		return null;
+	}
+	
+	@Override
+	public HadoopInputSplit[] createInputSplits(int minNumSplits)
+			throws IOException {
+		org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits);
+		HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length];
+		for(int i=0;i<splitArray.length;i++){
+			hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf);
+		}
+		return hiSplit;
+	}
+	
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
+		return new LocatableInputSplitAssigner(inputSplits);
+	}
+	
+	@Override
+	public void open(HadoopInputSplit split) throws IOException {
+		this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
+		if (this.recordReader instanceof Configurable) {
+			((Configurable) this.recordReader).setConf(jobConf);
+		}
+		key = this.recordReader.createKey();
+		value = this.recordReader.createValue();
+		this.fetched = false;
+	}
+	
+	@Override
+	public boolean reachedEnd() throws IOException {
+		if(!fetched) {
+			fetchNext();
+		}
+		return !hasNext;
+	}
+	
+	private void fetchNext() throws IOException {
+		hasNext = this.recordReader.next(key, value);
+		fetched = true;
+	}
+	
+	@Override
+	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
+		if(!fetched) {
+			fetchNext();
+		}
+		if(!hasNext) {
+			return null;
+		}
+		record.f0 = key;
+		record.f1 = value;
+		fetched = false;
+		return record;
+	}
+	
+	@Override
+	public void close() throws IOException {
+		this.recordReader.close();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Helper methods
+	// --------------------------------------------------------------------------------------------
+	
+	private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
+			ArrayList<FileStatus> files) throws IOException {
+		
+		long latestModTime = 0L;
+		
+		// get the file info and check whether the cached statistics are still valid.
+		for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
+			
+			final Path filePath = new Path(hadoopPath.toUri());
+			final FileSystem fs = FileSystem.get(filePath.toUri());
+			
+			final FileStatus file = fs.getFileStatus(filePath);
+			latestModTime = Math.max(latestModTime, file.getModificationTime());
+			
+			// enumerate all files and check their modification time stamp.
+			if (file.isDir()) {
+				FileStatus[] fss = fs.listStatus(filePath);
+				files.ensureCapacity(files.size() + fss.length);
+				
+				for (FileStatus s : fss) {
+					if (!s.isDir()) {
+						files.add(s);
+						latestModTime = Math.max(s.getModificationTime(), latestModTime);
+					}
+				}
+			} else {
+				files.add(file);
+			}
+		}
+		
+		// check whether the cached statistics are still valid, if we have any
+		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
+			return cachedStats;
+		}
+		
+		// calculate the whole length
+		long len = 0;
+		for (FileStatus s : files) {
+			len += s.getLen();
+		}
+		
+		// sanity check
+		if (len <= 0) {
+			len = BaseStatistics.SIZE_UNKNOWN;
+		}
+		
+		return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom serialization methods
+	// --------------------------------------------------------------------------------------------
+	
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(mapredInputFormat.getClass().getName());
+		out.writeUTF(keyClass.getName());
+		out.writeUTF(valueClass.getName());
+		jobConf.write(out);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		String hadoopInputFormatClassName = in.readUTF();
+		String keyClassName = in.readUTF();
+		String valueClassName = in.readUTF();
+		if(jobConf == null) {
+			jobConf = new JobConf();
+		}
+		jobConf.readFields(in);
+		try {
+			this.mapredInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
+		}
+		try {
+			this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to find key class.", e);
+		}
+		try {
+			this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to find value class.", e);
+		}
+		ReflectionUtils.setConf(mapredInputFormat, jobConf);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  ResultTypeQueryable
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public TypeInformation<Tuple2<K,V>> getProducedType() {
+		return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
new file mode 100644
index 0000000..dfe0067
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. 
+ */
+@SuppressWarnings("rawtypes")
+public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+					extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> 
+					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
+	private transient JobConf jobConf;
+
+	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector;
+	private transient Reporter reporter;
+	
+	/**
+	 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+	 * 
+	 * @param hadoopMapper The Hadoop Mapper to wrap.
+	 */
+	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) {
+		this(hadoopMapper, new JobConf());
+	}
+	
+	/**
+	 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+	 * The Hadoop Mapper is configured with the provided JobConf.
+	 * 
+	 * @param hadoopMapper The Hadoop Mapper to wrap.
+	 * @param conf The JobConf that is used to configure the Hadoop Mapper.
+	 */
+	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
+		if(hadoopMapper == null) {
+			throw new NullPointerException("Mapper may not be null.");
+		}
+		if(conf == null) {
+			throw new NullPointerException("JobConf may not be null.");
+		}
+		
+		this.mapper = hadoopMapper;
+		this.jobConf = conf;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.mapper.configure(jobConf);
+		
+		this.reporter = new HadoopDummyReporter();
+		this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
+	}
+
+	@Override
+	public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) 
+			throws Exception {
+		outputCollector.setFlinkCollector(out);
+		mapper.map(value.f0, value.f1, outputCollector, reporter);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {	
+		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
+		
+		final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+		final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
+		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+	}
+	
+	/**
+	 * Custom serialization methods.
+	 *  @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
+	 */
+	private void writeObject(final ObjectOutputStream out) throws IOException {
+		out.writeObject(mapper.getClass());
+		jobConf.write(out);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+		Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass = 
+				(Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+		mapper = InstantiationUtil.instantiate(mapperClass);
+		
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
new file mode 100644
index 0000000..f3abfcd
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, FinalizeOnMaster {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private JobConf jobConf;
+	private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;	
+	private transient RecordWriter<K,V> recordWriter;	
+	private transient FileOutputCommitter fileOutputCommitter;
+	private transient TaskAttemptContext context;
+	private transient JobContext jobContext;
+	
+	public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat, JobConf job) {
+		super();
+		this.mapredOutputFormat = mapredOutputFormat;
+		HadoopUtils.mergeHadoopConf(job);
+		this.jobConf = job;
+	}
+	
+	public void setJobConf(JobConf job) {
+		this.jobConf = job;
+	}
+	
+	public JobConf getJobConf() {
+		return jobConf;
+	}
+	
+	public org.apache.hadoop.mapred.OutputFormat<K,V> getHadoopOutputFormat() {
+		return mapredOutputFormat;
+	}
+	
+	public void setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat) {
+		this.mapredOutputFormat = mapredOutputFormat;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  OutputFormat
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void configure(Configuration parameters) {
+		// nothing to do
+	}
+	
+	/**
+	 * create the temporary output file for hadoop RecordWriter.
+	 * @param taskNumber The number of the parallel instance.
+	 * @param numTasks The number of parallel tasks.
+	 * @throws IOException
+	 */
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		if (Integer.toString(taskNumber + 1).length() > 6) {
+			throw new IOException("Task id too large.");
+		}
+		
+		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
+				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") 
+				+ Integer.toString(taskNumber + 1) 
+				+ "_0");
+		
+		this.jobConf.set("mapred.task.id", taskAttemptID.toString());
+		this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
+		// for hadoop 2.2
+		this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+		this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
+		
+		try {
+			this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		
+		this.fileOutputCommitter = new FileOutputCommitter();
+		
+		try {
+			this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		
+		this.fileOutputCommitter.setupJob(jobContext);
+		
+		this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
+	}
+	
+	@Override
+	public void writeRecord(Tuple2<K, V> record) throws IOException {
+		this.recordWriter.write(record.f0, record.f1);
+	}
+	
+	/**
+	 * commit the task by moving the output file out from the temporary directory.
+	 * @throws IOException
+	 */
+	@Override
+	public void close() throws IOException {
+		this.recordWriter.close(new HadoopDummyReporter());
+		
+		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
+			this.fileOutputCommitter.commitTask(this.context);
+		}
+	}
+	
+	@Override
+	public void finalizeGlobal(int parallelism) throws IOException {
+
+		try {
+			JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+			FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
+			
+			// finalize HDFS output format
+			fileOutputCommitter.commitJob(jobContext);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom serialization methods
+	// --------------------------------------------------------------------------------------------
+	
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(mapredOutputFormat.getClass().getName());
+		jobConf.write(out);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		String hadoopOutputFormatName = in.readUTF();
+		if(jobConf == null) {
+			jobConf = new JobConf();
+		}
+		jobConf.readFields(in);
+		try {
+			this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
+		}
+		ReflectionUtils.setConf(mapredOutputFormat, jobConf);
+	}
+}


Mime
View raw message