flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [3/3] incubator-flink git commit: [FLINK-1252] Add support for serializing Date and Enums in the Java API
Date Thu, 20 Nov 2014 14:30:20 GMT
[FLINK-1252] Add support for serializing Date and Enums in the Java API


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/591f16dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/591f16dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/591f16dd

Branch: refs/heads/master
Commit: 591f16dd8f80cc2a5b2fdc6654c3c2d625119faa
Parents: 4203bf9
Author: Robert Metzger <metzgerr@web.de>
Authored: Wed Nov 19 00:41:54 2014 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Thu Nov 20 15:29:44 2014 +0100

----------------------------------------------------------------------
 .../api/common/typeinfo/BasicTypeInfo.java      |   6 +-
 .../common/typeutils/base/DateComparator.java   |  89 +++++++++++++++
 .../common/typeutils/base/DateSerializer.java   |  85 ++++++++++++++
 .../common/typeutils/base/EnumComparator.java   |  88 +++++++++++++++
 .../common/typeutils/base/EnumSerializer.java   | 112 +++++++++++++++++++
 .../typeutils/base/DateComparatorTest.java      |  60 ++++++++++
 .../typeutils/base/DateSerializerTest.java      |  53 +++++++++
 .../flink/api/java/typeutils/EnumTypeInfo.java  | 101 +++++++++++++++++
 .../flink/api/java/typeutils/TypeExtractor.java |   4 +-
 .../type/extractor/PojoTypeExtractionTest.java  |   2 +-
 .../test/javaApiOperators/ReduceITCase.java     |  66 ++++++++++-
 .../util/CollectionDataSets.java                |  29 ++++-
 12 files changed, 689 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index a152b4a..f27da07 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.typeinfo;
 
 import java.lang.reflect.Constructor;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -30,6 +31,8 @@ import org.apache.flink.api.common.typeutils.base.ByteComparator;
 import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.common.typeutils.base.CharComparator;
 import org.apache.flink.api.common.typeutils.base.CharSerializer;
+import org.apache.flink.api.common.typeutils.base.DateComparator;
+import org.apache.flink.api.common.typeutils.base.DateSerializer;
 import org.apache.flink.api.common.typeutils.base.DoubleComparator;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatComparator;
@@ -58,6 +61,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements
AtomicType<T
 	public static final BasicTypeInfo<Float> FLOAT_TYPE_INFO = new BasicTypeInfo<Float>(Float.class,
FloatSerializer.INSTANCE, FloatComparator.class);
 	public static final BasicTypeInfo<Double> DOUBLE_TYPE_INFO = new BasicTypeInfo<Double>(Double.class,
DoubleSerializer.INSTANCE, DoubleComparator.class);
 	public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<Character>(Character.class,
CharSerializer.INSTANCE, CharComparator.class);
+	public static final BasicTypeInfo<Date> DATE_TYPE_INFO = new BasicTypeInfo<Date>(Date.class,
DateSerializer.INSTANCE, DateComparator.class);
 	
 	// --------------------------------------------------------------------------------------------
 
@@ -146,7 +150,6 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements
AtomicType<T
 			throw new NullPointerException();
 		}
 		
-		@SuppressWarnings("unchecked")
 		BasicTypeInfo<X> info = (BasicTypeInfo<X>) TYPES.get(type);
 		return info;
 	}
@@ -181,5 +184,6 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements
AtomicType<T
 		TYPES.put(double.class, DOUBLE_TYPE_INFO);
 		TYPES.put(Character.class, CHAR_TYPE_INFO);
 		TYPES.put(char.class, CHAR_TYPE_INFO);
+		TYPES.put(Date.class, DATE_TYPE_INFO);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java
new file mode 100644
index 0000000..4bc37f2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.util.Date;
+
+
+public final class DateComparator extends BasicTypeComparator<Date> {
+
+	private static final long serialVersionUID = 1L;
+
+
+	public DateComparator(boolean ascending) {
+		super(ascending);
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws
IOException {
+		long l1 = firstSource.readLong();
+		long l2 = secondSource.readLong();
+		int comp = (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1)); 
+		return ascendingComparison ? comp : -comp;
+	}
+
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return true;
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return 8;
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < 8;
+	}
+
+	@Override
+	public void putNormalizedKey(Date lValue, MemorySegment target, int offset, int numBytes)
{
+		long value = lValue.getTime() - Long.MIN_VALUE;
+		
+		// see IntValue for an explanation of the logic
+		if (numBytes == 8) {
+			// default case, full normalized key
+			target.putLongBigEndian(offset, value);
+		}
+		else if (numBytes <= 0) {
+		}
+		else if (numBytes < 8) {
+			for (int i = 0; numBytes > 0; numBytes--, i++) {
+				target.put(offset + i, (byte) (value >>> ((7-i)<<3)));
+			}
+		}
+		else {
+			target.putLongBigEndian(offset, value);
+			for (int i = 8; i < numBytes; i++) {
+				target.put(offset + i, (byte) 0);
+			}
+		}
+	}
+
+	@Override
+	public DateComparator duplicate() {
+		return new DateComparator(ascendingComparison);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
new file mode 100644
index 0000000..4bd2ea8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Date;
+
+
+public final class DateSerializer extends TypeSerializerSingleton<Date> {
+
+	private static final long serialVersionUID = 1L;
+	
+	public static final DateSerializer INSTANCE = new DateSerializer();
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public boolean isStateful() {
+		return false;
+	}
+	
+	@Override
+	public Date createInstance() {
+		return new Date();
+	}
+
+	@Override
+	public Date copy(Date from) {
+		return new Date(from.getTime());
+	}
+	
+	@Override
+	public Date copy(Date from, Date reuse) {
+		reuse.setTime(from.getTime());
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return 8;
+	}
+
+	@Override
+	public void serialize(Date record, DataOutputView target) throws IOException {
+		target.writeLong(record.getTime());
+	}
+
+	@Override
+	public Date deserialize(DataInputView source) throws IOException {
+		return new Date(source.readLong());
+	}
+	
+	@Override
+	public Date deserialize(Date reuse, DataInputView source) throws IOException {
+		reuse.setTime(source.readLong());
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		target.writeLong(source.readLong());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
new file mode 100644
index 0000000..af6d30d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+
+
+public final class EnumComparator<T extends Enum<T>> extends BasicTypeComparator<T>
{
+
+	private static final long serialVersionUID = 1L;
+
+
+	public EnumComparator(boolean ascending) {
+		super(ascending);
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws
IOException {
+		int i1 = firstSource.readInt();
+		int i2 = secondSource.readInt();
+		int comp = (i1 < i2 ? -1 : (i1 == i2 ? 0 : 1)); 
+		return ascendingComparison ? comp : -comp; 
+	}
+
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return true;
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return 4;
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < 4;
+	}
+
+	@Override
+	public void putNormalizedKey(T iValue, MemorySegment target, int offset, int numBytes) {
+		int value = iValue.ordinal() - Integer.MIN_VALUE;
+		
+		// see IntValue for an explanation of the logic
+		if (numBytes == 4) {
+			// default case, full normalized key
+			target.putIntBigEndian(offset, value);
+		}
+		else if (numBytes <= 0) {
+		}
+		else if (numBytes < 4) {
+			for (int i = 0; numBytes > 0; numBytes--, i++) {
+				target.put(offset + i, (byte) (value >>> ((3-i)<<3)));
+			}
+		}
+		else {
+			target.putLongBigEndian(offset, value);
+			for (int i = 4; i < numBytes; i++) {
+				target.put(offset + i, (byte) 0);
+			}
+		}
+	}
+
+	@Override
+	public EnumComparator duplicate() {
+		return new EnumComparator(ascendingComparison);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
new file mode 100644
index 0000000..b46e956
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Method;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+
+public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T>
{
+
+	private static final long serialVersionUID = 1L;
+
+	private transient T[] values;
+
+	private final Class<T> enumClass;
+
+	public EnumSerializer(Class<T> enumClass) {
+		this.enumClass = enumClass;
+		this.values = createValues(enumClass);
+	}
+
+
+	@Override
+	public boolean isImmutableType() {
+		return true;
+	}
+
+	@Override
+	public boolean isStateful() {
+		return false;
+	}
+
+	@Override
+	public T createInstance() {
+		return values[0];
+	}
+
+	@Override
+	public T copy(T from) {
+		return from;
+	}
+
+	@Override
+	public T copy(T from, T reuse) {
+		return from;
+	}
+
+	@Override
+	public int getLength() {
+		return 4;
+	}
+
+	@Override
+	public void serialize(T record, DataOutputView target) throws IOException {
+		target.writeInt(record.ordinal());
+	}
+
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		return values[source.readInt()];
+	}
+
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		return values[source.readInt()];
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		target.write(source, 4);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
+		in.defaultReadObject();
+		this.values = createValues(this.enumClass);
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <T> T[] createValues(Class<T> enumClass) {
+		try {
+			Method valuesMethod = enumClass.getMethod("values");
+			return (T[]) valuesMethod.invoke(null);
+
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Cannot access the constants of the enum " + enumClass.getName());
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateComparatorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateComparatorTest.java
new file mode 100644
index 0000000..b989713
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateComparatorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Date;
+import java.util.Random;
+
+public class DateComparatorTest extends ComparatorTestBase<Date> {
+
+	@Override
+	protected TypeComparator<Date> createComparator(boolean ascending) {
+		return new DateComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<Date> createSerializer() {
+		return new DateSerializer();
+	}
+
+	@Override
+	protected Date[] getSortedTestData() {
+		Random rnd = new Random(874597969123412338L);
+		long rndLong = rnd.nextLong();
+		if (rndLong < 0) {
+			rndLong = -rndLong;
+		}
+		if (rndLong == Long.MAX_VALUE) {
+			rndLong -= 3;
+		}
+		if (rndLong <= 2) {
+			rndLong += 3;
+		}
+		return new Date[]{
+			new Date(0L),
+			new Date(1L),
+			new Date(2L),
+			new Date(rndLong),
+			new Date(Long.MAX_VALUE)};
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateSerializerTest.java
new file mode 100644
index 0000000..f7737b0
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateSerializerTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Date;
+import java.util.Random;
+
+/**
+ * A test for the {@link org.apache.flink.api.common.typeutils.base.DateSerializer}.
+ */
+public class DateSerializerTest extends SerializerTestBase<Date> {
+	
+	@Override
+	protected TypeSerializer<Date> createSerializer() {
+		return new DateSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 8;
+	}
+	
+	@Override
+	protected Class<Date> getTypeClass() {
+		return Date.class;
+	}
+	
+	@Override
+	protected Date[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		long rndLong = rnd.nextLong();
+		return new Date[] {new Date(0L), new Date(1L),new Date(Long.MAX_VALUE), new Date(rndLong),};
+	}
+}	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
new file mode 100644
index 0000000..c19dfe3
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.EnumComparator;
+import org.apache.flink.api.common.typeutils.base.EnumSerializer;
+
+public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T>
implements AtomicType<T> {
+
+	private final Class<T> typeClass;
+
+	public EnumTypeInfo(Class<T> typeClass) {
+		if (typeClass == null) {
+			throw new NullPointerException();
+		}
+		if (!Enum.class.isAssignableFrom(typeClass) ) {
+			throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of "
+ Enum.class.getName());
+		}
+		this.typeClass = typeClass;
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public TypeComparator<T> createComparator(boolean sortOrderAscending) {
+		return new EnumComparator<T>(sortOrderAscending);
+	}
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 1;
+	}
+	
+	@Override
+	public int getTotalFields() {
+		return 1;
+	}
+
+	@Override
+	public Class<T> getTypeClass() {
+		return this.typeClass;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return true;
+	}
+
+	@Override
+	public TypeSerializer<T> createSerializer() {
+		return new EnumSerializer<T>(typeClass);
+	}
+	
+	@Override
+	public String toString() {
+		return "EnumTypeInfo<" + typeClass.getName() + ">";
+	}	
+	
+	@Override
+	public int hashCode() {
+		return typeClass.hashCode() ^ 0xd3a2646c;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj.getClass() == EnumTypeInfo.class) {
+			return typeClass == ((EnumTypeInfo<?>) obj).typeClass;
+		} else {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index cf3751c..55faff1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -877,7 +877,6 @@ public class TypeExtractor {
 		}
 
 		if (clazz.equals(Object.class)) {
-			// TODO (merging): better throw an exception here. the runtime does not support it yet
 			return new GenericTypeInfo<X>(clazz);
 		}
 		
@@ -924,6 +923,9 @@ public class TypeExtractor {
 			throw new InvalidTypesException("Type information extraction for tuples cannot be done
based on the class.");
 		}
 
+		if(Enum.class.isAssignableFrom(clazz)) {
+			return new EnumTypeInfo(clazz);
+		}
 
 		if (alreadySeen.contains(clazz)) {
 			return new GenericTypeInfo<X>(clazz);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index da74d4b..e5ac1ca 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -288,7 +288,7 @@ public class PojoTypeExtractionTest {
 					Assert.fail("already seen");
 				}
 				dateSeen = true;
-				Assert.assertEquals(new GenericTypeInfo<Date>(Date.class), field.type);
+				Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.type);
 				Assert.assertEquals(Date.class, field.type.getTypeClass());
 			} else if(name.equals("someNumber")) {
 				if(intSeen) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index a1957f9..38a7a0e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -21,8 +21,11 @@ package org.apache.flink.test.javaApiOperators;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Date;
 import java.util.LinkedList;
 
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -34,7 +37,10 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoWithDateAndEnum;
 import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -42,7 +48,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class ReduceITCase extends JavaProgramTestBase {
 	
-	private static int NUM_PROGRAMS = 10;
+	private static int NUM_PROGRAMS = 11;
 	
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -330,7 +336,63 @@ public class ReduceITCase extends JavaProgramTestBase {
 						"5,11,10,GHI,1\n" +
 						"5,29,0,P-),2\n" +
 						"5,25,0,P-),3\n";
-			} 
+			}
+			case 11: {
+				/**
+				 * Test support for Date and enum serialization
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				DataSet<PojoWithDateAndEnum> ds = env.generateSequence(0,2).map(new MapFunction<Long,
PojoWithDateAndEnum>() {
+					@Override
+					public PojoWithDateAndEnum map(Long value) throws Exception {
+						int l = value.intValue();
+						switch (l) {
+							case 0:
+								PojoWithDateAndEnum one = new PojoWithDateAndEnum();
+								one.group = "a";
+								one.date = new Date(666);
+								one.cat = CollectionDataSets.Category.CAT_A;
+								return one;
+							case 1:
+								PojoWithDateAndEnum two = new PojoWithDateAndEnum();
+								two.group = "a";
+								two.date = new Date(666);
+								two.cat = CollectionDataSets.Category.CAT_A;
+								return two;
+							case 2:
+								PojoWithDateAndEnum three = new PojoWithDateAndEnum();
+								three.group = "b";
+								three.date = new Date(666);
+								three.cat = CollectionDataSets.Category.CAT_B;
+								return three;
+						}
+						throw new RuntimeException("Unexpected value for l=" + l);
+					}
+				});
+				DataSet<String> res = ds.groupBy("group").reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithDateAndEnum,
String>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void reduce(Iterable<PojoWithDateAndEnum> values,
+							Collector<String> out) throws Exception {
+						for(PojoWithDateAndEnum val : values) {
+							if(val.cat == CollectionDataSets.Category.CAT_A) {
+								Assert.assertEquals("a", val.group);
+							} else if(val.cat == CollectionDataSets.Category.CAT_B) {
+								Assert.assertEquals("b", val.group);
+							} else {
+								Assert.fail("error");
+							}
+							Assert.assertEquals(666, val.date.getTime());
+						}
+						out.collect("ok");
+					}
+				});
+				
+				res.writeAsText(resultPath);
+				env.execute();
+				return "ok\nok";
+			}
 			
 			default:
 				throw new IllegalArgumentException("Invalid program id");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 0f8097a..d6a8f40 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -532,5 +531,33 @@ public class CollectionDataSets {
 		return env.fromCollection(data);
 	}
 
+	public enum Category {
+		CAT_A, CAT_B;
+	}
+
+	public static class PojoWithDateAndEnum {
+		public String group;
+		public Date date;
+		public Category cat;
+	}
+	
+	public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment
env) {
+		List<PojoWithDateAndEnum> data = new ArrayList<PojoWithDateAndEnum>();
+		
+		PojoWithDateAndEnum one = new PojoWithDateAndEnum();
+		one.group = "a"; one.date = new Date(666); one.cat = Category.CAT_A;
+		data.add(one);
+		
+		PojoWithDateAndEnum two = new PojoWithDateAndEnum();
+		two.group = "a"; two.date = new Date(666); //two.cat = Category.CAT_A;
+		data.add(two);
+		
+		PojoWithDateAndEnum three = new PojoWithDateAndEnum();
+		three.group = "b"; three.date = new Date(666); //three.cat = Category.CAT_B;
+		data.add(three);
+		
+		return env.fromCollection(data);
+	}
+
 }
 


Mime
View raw message