flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/3] flink git commit: [FLINK-3856] [core] [api-extending] Create types for java.sql.Date/Time/Timestamp
Date Fri, 13 May 2016 07:10:40 GMT
Repository: flink
Updated Branches:
  refs/heads/master 43bd6f6e4 -> 3c90d3654


[FLINK-3856] [core] [api-extending] Create types for java.sql.Date/Time/Timestamp

This closes #1959


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbd02d24
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbd02d24
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbd02d24

Branch: refs/heads/master
Commit: bbd02d24bc7547e2c9384d713b20f86682cac08c
Parents: 43bd6f6
Author: twalthr <twalthr@apache.org>
Authored: Mon May 2 16:31:45 2016 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri May 13 00:46:18 2016 +0200

----------------------------------------------------------------------
 .../api/common/typeinfo/SqlTimeTypeInfo.java    | 168 +++++++++++++++++++
 .../common/typeutils/base/DateComparator.java   |  40 +++--
 .../common/typeutils/base/DateSerializer.java   |  22 ++-
 .../typeutils/base/SqlDateSerializer.java       | 105 ++++++++++++
 .../typeutils/base/SqlTimeSerializer.java       | 104 ++++++++++++
 .../typeutils/base/SqlTimestampComparator.java  |  98 +++++++++++
 .../typeutils/base/SqlTimestampSerializer.java  | 113 +++++++++++++
 .../flink/api/java/typeutils/TypeExtractor.java |  21 +++
 .../typeutils/base/SqlDateComparatorTest.java   |  49 ++++++
 .../typeutils/base/SqlDateSerializerTest.java   |  55 ++++++
 .../typeutils/base/SqlTimeComparatorTest.java   |  48 ++++++
 .../typeutils/base/SqlTimeSerializerTest.java   |  55 ++++++
 .../base/SqlTimestampComparatorTest.java        |  51 ++++++
 .../base/SqlTimestampSerializerTest.java        |  58 +++++++
 .../api/java/typeutils/TypeExtractorTest.java   |  36 ++++
 15 files changed, 996 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/SqlTimeTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/SqlTimeTypeInfo.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/SqlTimeTypeInfo.java
new file mode 100644
index 0000000..a05227c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/SqlTimeTypeInfo.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.reflect.Constructor;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Objects;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DateComparator;
+import org.apache.flink.api.common.typeutils.base.SqlDateSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Type information for Java SQL Date/Time/Timestamp.
+ */
+@PublicEvolving
+public class SqlTimeTypeInfo<T> extends TypeInformation<T> implements AtomicType<T>
{
+
+	private static final long serialVersionUID = -132955295409131880L;
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public static final SqlTimeTypeInfo<Date> DATE = new SqlTimeTypeInfo<>(Date.class,
SqlDateSerializer.INSTANCE, (Class) DateComparator.class);
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public static final SqlTimeTypeInfo<Time> TIME = new SqlTimeTypeInfo<>(Time.class,
SqlTimeSerializer.INSTANCE, (Class) DateComparator.class);
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public static final SqlTimeTypeInfo<Timestamp> TIMESTAMP = new SqlTimeTypeInfo<>(Timestamp.class,
SqlTimestampSerializer.INSTANCE, (Class) SqlTimestampComparator.class);
+
+	// --------------------------------------------------------------------------------------------
+
+	private final Class<T> clazz;
+
+	private final TypeSerializer<T> serializer;
+
+	private final Class<? extends TypeComparator<T>> comparatorClass;
+
+	protected SqlTimeTypeInfo(Class<T> clazz, TypeSerializer<T> serializer, Class<?
extends TypeComparator<T>> comparatorClass) {
+		this.clazz = checkNotNull(clazz);
+		this.serializer = checkNotNull(serializer);
+		this.comparatorClass = checkNotNull(comparatorClass);
+	}
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 1;
+	}
+
+	@Override
+	public int getTotalFields() {
+		return 1;
+	}
+
+	@Override
+	public Class<T> getTypeClass() {
+		return clazz;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return true;
+	}
+
+	@Override
+	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
+		return serializer;
+	}
+
+	@Override
+	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig
executionConfig) {
+		return instantiateComparator(comparatorClass, sortOrderAscending);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(clazz, serializer, comparatorClass);
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof SqlTimeTypeInfo;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof SqlTimeTypeInfo) {
+			@SuppressWarnings("unchecked")
+			SqlTimeTypeInfo<T> other = (SqlTimeTypeInfo<T>) obj;
+
+			return other.canEqual(this) &&
+				this.clazz == other.clazz &&
+				serializer.equals(other.serializer) &&
+				this.comparatorClass == other.comparatorClass;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return clazz.getSimpleName();
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static <X> TypeComparator<X> instantiateComparator(Class<? extends
TypeComparator<X>> comparatorClass, boolean ascendingOrder) {
+		try {
+			Constructor<? extends TypeComparator<X>> constructor = comparatorClass.getConstructor(boolean.class);
+			return constructor.newInstance(ascendingOrder);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not initialize comparator " + comparatorClass.getName(),
e);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public static <X> SqlTimeTypeInfo<X> getInfoFor(Class<X> type) {
+		checkNotNull(type);
+
+		if (type == Date.class) {
+			return (SqlTimeTypeInfo<X>) DATE;
+		}
+		else if (type == Time.class) {
+			return (SqlTimeTypeInfo<X>) TIME;
+		}
+		else if (type == Timestamp.class) {
+			return (SqlTimeTypeInfo<X>) TIMESTAMP;
+		}
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java
index fbf73ba..d959919 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java
@@ -30,17 +30,13 @@ public final class DateComparator extends BasicTypeComparator<Date>
{
 
 	private static final long serialVersionUID = 1L;
 
-
 	public DateComparator(boolean ascending) {
 		super(ascending);
 	}
 
 	@Override
 	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws
IOException {
-		long l1 = firstSource.readLong();
-		long l2 = secondSource.readLong();
-		int comp = (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1)); 
-		return ascendingComparison ? comp : -comp;
+		return compareSerializedDate(firstSource, secondSource, ascendingComparison);
 	}
 
 	@Override
@@ -59,16 +55,35 @@ public final class DateComparator extends BasicTypeComparator<Date>
{
 	}
 
 	@Override
-	public void putNormalizedKey(Date lValue, MemorySegment target, int offset, int numBytes)
{
-		long value = lValue.getTime() - Long.MIN_VALUE;
-		
+	public void putNormalizedKey(Date record, MemorySegment target, int offset, int numBytes)
{
+		putNormalizedKeyDate(record, target, offset, numBytes);
+	}
+
+	@Override
+	public DateComparator duplicate() {
+		return new DateComparator(ascendingComparison);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//                           Static Helpers for Date Comparison
+	// --------------------------------------------------------------------------------------------
+
+	public static int compareSerializedDate(DataInputView firstSource, DataInputView secondSource,
+			boolean ascendingComparison) throws IOException {
+		final long l1 = firstSource.readLong();
+		final long l2 = secondSource.readLong();
+		final int comp = (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1));
+		return ascendingComparison ? comp : -comp;
+	}
+
+	public static void putNormalizedKeyDate(Date record, MemorySegment target, int offset, int
numBytes) {
+		final long value = record.getTime() - Long.MIN_VALUE;
+
 		// see IntValue for an explanation of the logic
 		if (numBytes == 8) {
 			// default case, full normalized key
 			target.putLongBigEndian(offset, value);
 		}
-		else if (numBytes <= 0) {
-		}
 		else if (numBytes < 8) {
 			for (int i = 0; numBytes > 0; numBytes--, i++) {
 				target.put(offset + i, (byte) (value >>> ((7-i)<<3)));
@@ -81,9 +96,4 @@ public final class DateComparator extends BasicTypeComparator<Date>
{
 			}
 		}
 	}
-
-	@Override
-	public DateComparator duplicate() {
-		return new DateComparator(ascendingComparison);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
index 1d3751b..3f27de2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -18,18 +18,17 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
+import java.io.IOException;
+import java.util.Date;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-import java.io.IOException;
-import java.util.Date;
-
 @Internal
 public final class DateSerializer extends TypeSerializerSingleton<Date> {
 
 	private static final long serialVersionUID = 1L;
-	
+
 	public static final DateSerializer INSTANCE = new DateSerializer();
 
 	@Override
@@ -50,10 +49,9 @@ public final class DateSerializer extends TypeSerializerSingleton<Date>
{
 		return new Date(from.getTime());
 	}
 
-	
 	@Override
 	public Date copy(Date from, Date reuse) {
-		if(from == null) {
+		if (from == null) {
 			return null;
 		}
 		reuse.setTime(from.getTime());
@@ -67,8 +65,8 @@ public final class DateSerializer extends TypeSerializerSingleton<Date>
{
 
 	@Override
 	public void serialize(Date record, DataOutputView target) throws IOException {
-		if(record == null) {
-			target.writeLong(-1L);
+		if (record == null) {
+			target.writeLong(Long.MIN_VALUE);
 		} else {
 			target.writeLong(record.getTime());
 		}
@@ -76,8 +74,8 @@ public final class DateSerializer extends TypeSerializerSingleton<Date>
{
 
 	@Override
 	public Date deserialize(DataInputView source) throws IOException {
-		long v = source.readLong();
-		if(v == -1L) {
+		final long v = source.readLong();
+		if (v == Long.MIN_VALUE) {
 			return null;
 		} else {
 			return new Date(v);
@@ -86,8 +84,8 @@ public final class DateSerializer extends TypeSerializerSingleton<Date>
{
 	
 	@Override
 	public Date deserialize(Date reuse, DataInputView source) throws IOException {
-		long v = source.readLong();
-		if(v == -1L) {
+		final long v = source.readLong();
+		if (v == Long.MIN_VALUE) {
 			return null;
 		}
 		reuse.setTime(v);

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
new file mode 100644
index 0000000..9e1bbfd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.sql.Date;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+@Internal
+public final class SqlDateSerializer extends TypeSerializerSingleton<Date> {
+
+	private static final long serialVersionUID = 1L;
+
+	public static final SqlDateSerializer INSTANCE = new SqlDateSerializer();
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public Date createInstance() {
+		return new Date(0L);
+	}
+
+	@Override
+	public Date copy(Date from) {
+		if (from == null) {
+			return null;
+		}
+		return new Date(from.getTime());
+	}
+
+	@Override
+	public Date copy(Date from, Date reuse) {
+		if (from == null) {
+			return null;
+		}
+		reuse.setTime(from.getTime());
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return 8;
+	}
+
+	@Override
+	public void serialize(Date record, DataOutputView target) throws IOException {
+		if (record == null) {
+			target.writeLong(Long.MIN_VALUE);
+		} else {
+			target.writeLong(record.getTime());
+		}
+	}
+
+	@Override
+	public Date deserialize(DataInputView source) throws IOException {
+		final long v = source.readLong();
+		if (v == Long.MIN_VALUE) {
+			return null;
+		} else {
+			return new Date(v);
+		}
+	}
+
+	@Override
+	public Date deserialize(Date reuse, DataInputView source) throws IOException {
+		final long v = source.readLong();
+		if (v == Long.MIN_VALUE) {
+			return null;
+		}
+		reuse.setTime(v);
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		target.writeLong(source.readLong());
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof SqlDateSerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
new file mode 100644
index 0000000..544cf0f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+import java.sql.Time;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+@Internal
+public final class SqlTimeSerializer extends TypeSerializerSingleton<Time> {
+
+	private static final long serialVersionUID = 1L;
+
+	public static final SqlTimeSerializer INSTANCE = new SqlTimeSerializer();
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public Time createInstance() {
+		return new Time(0L);
+	}
+
+	@Override
+	public Time copy(Time from) {
+		if (from == null) {
+			return null;
+		}
+		return new Time(from.getTime());
+	}
+
+	@Override
+	public Time copy(Time from, Time reuse) {
+		if (from == null) {
+			return null;
+		}
+		reuse.setTime(from.getTime());
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return 8;
+	}
+
+	@Override
+	public void serialize(Time record, DataOutputView target) throws IOException {
+		if (record == null) {
+			target.writeLong(Long.MIN_VALUE);
+		} else {
+			target.writeLong(record.getTime());
+		}
+	}
+
+	@Override
+	public Time deserialize(DataInputView source) throws IOException {
+		final long v = source.readLong();
+		if (v == Long.MIN_VALUE) {
+			return null;
+		} else {
+			return new Time(v);
+		}
+	}
+
+	@Override
+	public Time deserialize(Time reuse, DataInputView source) throws IOException {
+		final long v = source.readLong();
+		if (v == Long.MIN_VALUE) {
+			return null;
+		}
+		reuse.setTime(v);
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		target.writeLong(source.readLong());
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof SqlTimeSerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparator.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparator.java
new file mode 100644
index 0000000..92411b2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * Comparator for comparing Java SQL Timestamps.
+ */
+@Internal
+public final class SqlTimestampComparator extends BasicTypeComparator<java.util.Date>
{
+
+	private static final long serialVersionUID = 1L;
+
+	public SqlTimestampComparator(boolean ascending) {
+		super(ascending);
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws
IOException {
+		// compare Date part
+		final int comp = DateComparator.compareSerializedDate(firstSource, secondSource, ascendingComparison);
+		// compare nanos
+		if (comp == 0) {
+			final int i1 = firstSource.readInt();
+			final int i2 = secondSource.readInt();
+			final int comp2 = (i1 < i2 ? -1 : (i1 == i2 ? 0 : 1));
+			return ascendingComparison ? comp2 : -comp2;
+		}
+		return comp;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return true;
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return 12;
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return keyBytes < 12;
+	}
+
+	@Override
+	public void putNormalizedKey(java.util.Date record, MemorySegment target, int offset, int
numBytes) {
+		// put Date key
+		DateComparator.putNormalizedKeyDate(record, target, offset, numBytes > 8 ? 8 : numBytes);
+		numBytes -= 8;
+		offset += 8;
+		if (numBytes <= 0) {
+			// nothing to do
+		}
+		// put nanos
+		else if (numBytes < 4) {
+			final int nanos = ((Timestamp) record).getNanos();
+			for (int i = 0; numBytes > 0; numBytes--, i++) {
+				target.put(offset + i, (byte) (nanos >>> ((3-i)<<3)));
+			}
+		}
+		// put nanos with padding
+		else {
+			final int nanos = ((Timestamp) record).getNanos();
+			target.putIntBigEndian(offset, nanos);
+			for (int i = 4; i < numBytes; i++) {
+				target.put(offset + i, (byte) 0);
+			}
+		}
+	}
+
+	@Override
+	public SqlTimestampComparator duplicate() {
+		return new SqlTimestampComparator(ascendingComparison);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
new file mode 100644
index 0000000..dbbd3ff
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+@Internal
+public final class SqlTimestampSerializer extends TypeSerializerSingleton<Timestamp>
{
+
+	private static final long serialVersionUID = 1L;
+
+	public static final SqlTimestampSerializer INSTANCE = new SqlTimestampSerializer();
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public Timestamp createInstance() {
+		return new Timestamp(0L);
+	}
+
+	@Override
+	public Timestamp copy(Timestamp from) {
+		if (from == null) {
+			return null;
+		}
+		final Timestamp t = new Timestamp(from.getTime());
+		t.setNanos(from.getNanos());
+		return t;
+	}
+
+	@Override
+	public Timestamp copy(Timestamp from, Timestamp reuse) {
+		if (from == null) {
+			return null;
+		}
+		reuse.setTime(from.getTime());
+		reuse.setNanos(from.getNanos());
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return 12;
+	}
+
+	@Override
+	public void serialize(Timestamp record, DataOutputView target) throws IOException {
+		if (record == null) {
+			target.writeLong(Long.MIN_VALUE);
+			target.writeInt(0);
+		} else {
+			target.writeLong(record.getTime());
+			target.writeInt(record.getNanos());
+		}
+	}
+
+	@Override
+	public Timestamp deserialize(DataInputView source) throws IOException {
+		final long v = source.readLong();
+		if (v == Long.MIN_VALUE) {
+			return null;
+		} else {
+			final Timestamp t = new Timestamp(v);
+			t.setNanos(source.readInt());
+			return t;
+		}
+	}
+
+	@Override
+	public Timestamp deserialize(Timestamp reuse, DataInputView source) throws IOException {
+		final long v = source.readLong();
+		if (v == Long.MIN_VALUE) {
+			return null;
+		}
+		reuse.setTime(v);
+		reuse.setNanos(source.readInt());
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		target.writeLong(source.readLong());
+		target.writeInt(source.readInt());
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof SqlTimestampSerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 0469cc2..f2b9fd2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -53,6 +53,7 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -1031,6 +1032,20 @@ public class TypeExtractor {
 				}
 				
 			}
+			// check for Java SQL time types
+			else if (typeInfo instanceof SqlTimeTypeInfo) {
+
+				TypeInformation<?> actual;
+				// check if SQL time type at all
+				if (!(type instanceof Class<?>) || (actual = SqlTimeTypeInfo.getInfoFor((Class<?>)
type)) == null) {
+					throw new InvalidTypesException("SQL time type expected.");
+				}
+				// check if correct SQL time type
+				if (!typeInfo.equals(actual)) {
+					throw new InvalidTypesException("SQL time type '" + typeInfo + "' expected but was '"
+ actual + "'.");
+				}
+
+			}
 			// check for Java Tuples
 			else if (typeInfo instanceof TupleTypeInfo) {
 				// check if tuple at all
@@ -1520,6 +1535,12 @@ public class TypeExtractor {
 		if (basicTypeInfo != null) {
 			return basicTypeInfo;
 		}
+
+		// check for SQL time types
+		TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz);
+		if (timeTypeInfo != null) {
+			return timeTypeInfo;
+		}
 		
 		// check for subclasses of Value
 		if (Value.class.isAssignableFrom(clazz)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
new file mode 100644
index 0000000..cedefe7
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class SqlDateComparatorTest extends ComparatorTestBase<Date> {
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected TypeComparator<Date> createComparator(boolean ascending) {
+		return (TypeComparator) new DateComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<Date> createSerializer() {
+		return new SqlDateSerializer();
+	}
+
+	@Override
+	protected Date[] getSortedTestData() {
+		return new Date[] {
+			Date.valueOf("1970-01-01"),
+			Date.valueOf("1990-10-14"),
+			Date.valueOf("2013-08-12"),
+			Date.valueOf("2040-05-12")
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializerTest.java
new file mode 100644
index 0000000..a24051d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.sql.Date;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * A test for the {@link SqlDateSerializer}.
+ */
+public class SqlDateSerializerTest extends SerializerTestBase<Date> {
+
+	@Override
+	protected TypeSerializer<Date> createSerializer() {
+		return new SqlDateSerializer();
+	}
+
+	@Override
+	protected int getLength() {
+		return 8;
+	}
+
+	@Override
+	protected Class<Date> getTypeClass() {
+		return Date.class;
+	}
+
+	@Override
+	protected Date[] getTestData() {
+		return new Date[] {
+			new Date(0L),
+			Date.valueOf("1970-01-01"),
+			Date.valueOf("1990-10-14"),
+			Date.valueOf("2013-08-12"),
+			Date.valueOf("2040-05-12")
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
new file mode 100644
index 0000000..2b5cfdf
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.sql.Time;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class SqlTimeComparatorTest extends ComparatorTestBase<Time> {
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected TypeComparator<Time> createComparator(boolean ascending) {
+		return (TypeComparator) new DateComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<Time> createSerializer() {
+		return new SqlTimeSerializer();
+	}
+
+	@Override
+	protected Time[] getSortedTestData() {
+		return new Time[] {
+			Time.valueOf("00:00:00"),
+			Time.valueOf("02:42:85"),
+			Time.valueOf("14:15:59"),
+			Time.valueOf("18:00:45")
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
new file mode 100644
index 0000000..4d16050
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.sql.Time;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * A test for the {@link SqlTimeSerializer}.
+ */
+public class SqlTimeSerializerTest extends SerializerTestBase<Time> {
+
+	@Override
+	protected TypeSerializer<Time> createSerializer() {
+		return new SqlTimeSerializer();
+	}
+
+	@Override
+	protected int getLength() {
+		return 8;
+	}
+
+	@Override
+	protected Class<Time> getTypeClass() {
+		return Time.class;
+	}
+
+	@Override
+	protected Time[] getTestData() {
+		return new Time[] {
+			new Time(0L),
+			Time.valueOf("00:00:00"),
+			Time.valueOf("02:42:85"),
+			Time.valueOf("14:15:59"),
+			Time.valueOf("18:00:45")
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
new file mode 100644
index 0000000..0b8d294
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.sql.Timestamp;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class SqlTimestampComparatorTest extends ComparatorTestBase<Timestamp> {
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected TypeComparator<Timestamp> createComparator(boolean ascending) {
+		return (TypeComparator) new SqlTimestampComparator(ascending);
+	}
+
+	@Override
+	protected TypeSerializer<Timestamp> createSerializer() {
+		return new SqlTimestampSerializer();
+	}
+
+	@Override
+	protected Timestamp[] getSortedTestData() {
+		return new Timestamp[] {
+			Timestamp.valueOf("1970-01-01 00:00:00.000"),
+			Timestamp.valueOf("1990-10-14 02:42:85.123"),
+			Timestamp.valueOf("1990-10-14 02:42:85.123000001"),
+			Timestamp.valueOf("1990-10-14 02:42:85.123000002"),
+			Timestamp.valueOf("2013-08-12 14:15:59.478"),
+			Timestamp.valueOf("2013-08-12 14:15:59.479"),
+			Timestamp.valueOf("2040-05-12 18:00:45.999")
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
new file mode 100644
index 0000000..70172d5
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.sql.Timestamp;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * A test for the {@link SqlTimestampSerializer}.
+ */
+public class SqlTimestampSerializerTest extends SerializerTestBase<Timestamp> {
+
+	@Override
+	protected TypeSerializer<Timestamp> createSerializer() {
+		return new SqlTimestampSerializer();
+	}
+
+	@Override
+	protected int getLength() {
+		return 12;
+	}
+
+	@Override
+	protected Class<Timestamp> getTypeClass() {
+		return Timestamp.class;
+	}
+
+	@Override
+	protected Timestamp[] getTestData() {
+		return new Timestamp[] {
+			new Timestamp(0L),
+			Timestamp.valueOf("1970-01-01 00:00:00.000"),
+			Timestamp.valueOf("1990-10-14 02:42:85.123"),
+			Timestamp.valueOf("1990-10-14 02:42:85.123000001"),
+			Timestamp.valueOf("1990-10-14 02:42:85.123000002"),
+			Timestamp.valueOf("2013-08-12 14:15:59.478"),
+			Timestamp.valueOf("2013-08-12 14:15:59.479"),
+			Timestamp.valueOf("2040-05-12 18:00:45.999")
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd02d24/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
index 74c01ef..8fc1533 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
@@ -23,6 +23,9 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -41,6 +44,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
@@ -2050,4 +2054,36 @@ public class TypeExtractorTest {
 		Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, TypeExtractor.getForObject(new BigInteger("42")));
 		Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, TypeExtractor.getForObject(new BigDecimal("42.42")));
 	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testSqlTimeTypes() {
+		MapFunction<?, ?> function = new MapFunction<Tuple3<Date, Time, Timestamp>,
Tuple3<Date, Time, Timestamp>>() {
+			@Override
+			public Tuple3<Date, Time, Timestamp> map(Tuple3<Date, Time, Timestamp> value)
throws Exception {
+				return null;
+			}
+		};
+
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+			function,
+			(TypeInformation) TypeInformation.of(new TypeHint<Tuple3<Date, Time, Timestamp>>()
{
+		}));
+
+		Assert.assertTrue(ti.isTupleType());
+		TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
+		Assert.assertEquals(SqlTimeTypeInfo.DATE, tti.getTypeAt(0));
+		Assert.assertEquals(SqlTimeTypeInfo.TIME, tti.getTypeAt(1));
+		Assert.assertEquals(SqlTimeTypeInfo.TIMESTAMP, tti.getTypeAt(2));
+
+		// use getForClass()
+		Assert.assertEquals(tti.getTypeAt(0), TypeExtractor.getForClass(Date.class));
+		Assert.assertEquals(tti.getTypeAt(1), TypeExtractor.getForClass(Time.class));
+		Assert.assertEquals(tti.getTypeAt(2), TypeExtractor.getForClass(Timestamp.class));
+
+		// use getForObject()
+		Assert.assertEquals(SqlTimeTypeInfo.DATE, TypeExtractor.getForObject(Date.valueOf("1998-12-12")));
+		Assert.assertEquals(SqlTimeTypeInfo.TIME, TypeExtractor.getForObject(Time.valueOf("12:37:45")));
+		Assert.assertEquals(SqlTimeTypeInfo.TIMESTAMP, TypeExtractor.getForObject(Timestamp.valueOf("1998-12-12
12:37:45")));
+	}
 }


Mime
View raw message