flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/5] git commit: [FLINK-1111] Move Basic and Array Type Information into "flink-core" Project
Date Mon, 22 Sep 2014 15:40:15 GMT
[FLINK-1111] Move Basic and Array Type Information into "flink-core" Project


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e649d71d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e649d71d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e649d71d

Branch: refs/heads/master
Commit: e649d71d5e115c1008cfedb32b571e651b2477be
Parents: 24a9b1e
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Sep 22 17:31:26 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Sep 22 17:31:26 2014 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/AvroInputFormat.java      |   2 +-
 .../io/AvroInputFormatTypeExtractionTest.java   |   2 +-
 .../mapred/HadoopInputFormat.java               |   3 +-
 .../mapreduce/HadoopInputFormat.java            |   3 +-
 .../api/java/io/jdbc/example/JDBCExample.java   |   6 +-
 .../spargel/java/VertexCentricIteration.java    |   3 +-
 .../flink/streaming/api/JobGraphBuilder.java    |   2 +-
 .../flink/streaming/api/StreamConfig.java       |   2 +-
 .../api/datastream/BatchedDataStream.java       |   2 +-
 .../api/datastream/ConnectedDataStream.java     |   2 +-
 .../streaming/api/datastream/DataStream.java    |   2 +-
 .../api/datastream/SplitDataStream.java         |   2 +-
 .../api/streamcomponent/CoStreamTask.java       |   2 +-
 .../api/streamcomponent/InputHandler.java       |   2 +-
 .../api/streamcomponent/OutputHandler.java      |   2 +-
 .../streamrecord/StreamRecordSerializer.java    |   2 +-
 .../serialization/TypeSerializerWrapper.java    |   2 +-
 .../flink/streaming/util/MockCoInvokable.java   |   2 +-
 .../flink/streaming/util/MockInvokable.java     |   2 +-
 .../apache/flink/compiler/dag/SinkJoiner.java   |   2 +-
 .../compiler/dag/WorksetIterationNode.java      |   2 +-
 .../compiler/postpass/JavaApiPostPass.java      |   6 +-
 .../compiler/postpass/RecordModelPostPass.java  |   6 +-
 .../flink/compiler/util/NoOpBinaryUdfOp.java    |   2 +-
 .../flink/compiler/util/NoOpUnaryUdfOp.java     |   2 +-
 .../compiler/FeedbackPropertiesMatchTest.java   |   2 +-
 .../apache/flink/compiler/plan/ChannelTest.java |   2 +-
 .../common/functions/InvalidTypesException.java |  46 ++
 .../functions/RichGroupReduceFunction.java      |   2 +-
 .../operators/BinaryOperatorInformation.java    |   2 +-
 .../flink/api/common/operators/Operator.java    |   2 +-
 .../common/operators/OperatorInformation.java   |   2 +-
 .../operators/UnaryOperatorInformation.java     |   2 +-
 .../operators/base/BulkIterationBase.java       |   2 +-
 .../flink/api/common/typeinfo/AtomicType.java   |  30 ++
 .../api/common/typeinfo/BasicArrayTypeInfo.java | 131 ++++++
 .../api/common/typeinfo/BasicTypeInfo.java      | 180 ++++++++
 .../api/common/typeinfo/CompositeType.java      |  30 ++
 .../api/common/typeinfo/NothingTypeInfo.java    |  55 +++
 .../common/typeinfo/PrimitiveArrayTypeInfo.java | 117 +++++
 .../api/common/typeinfo/TypeInformation.java    |  36 ++
 .../typeutils/base/GenericArraySerializer.java  | 157 +++++++
 .../typeutils/record/RecordComparator.java      | 422 +++++++++++++++++++
 .../record/RecordComparatorFactory.java         | 152 +++++++
 .../typeutils/record/RecordPairComparator.java  | 106 +++++
 .../record/RecordPairComparatorFactory.java     | 105 +++++
 .../typeutils/record/RecordSerializer.java      | 114 +++++
 .../record/RecordSerializerFactory.java         |  75 ++++
 .../org/apache/flink/types/NothingTypeInfo.java |  54 ---
 .../org/apache/flink/types/TypeInformation.java |  36 --
 .../java/org/apache/flink/api/java/DataSet.java |   2 +-
 .../flink/api/java/ExecutionEnvironment.java    |   4 +-
 .../java/functions/InvalidTypesException.java   |  46 --
 .../api/java/functions/SemanticPropUtil.java    |   2 +-
 .../flink/api/java/io/CsvOutputFormat.java      |   2 +-
 .../java/io/LocalCollectionOutputFormat.java    |   2 +-
 .../java/operators/BulkIterationResultSet.java  |   2 +-
 .../api/java/operators/CoGroupOperator.java     |   2 +-
 .../flink/api/java/operators/CrossOperator.java |   2 +-
 .../flink/api/java/operators/DataSink.java      |   5 +-
 .../flink/api/java/operators/DataSource.java    |   2 +-
 .../api/java/operators/DeltaIteration.java      |   2 +-
 .../java/operators/DeltaIterationResultSet.java |   2 +-
 .../api/java/operators/DistinctOperator.java    |   3 +-
 .../api/java/operators/FlatMapOperator.java     |   3 +-
 .../api/java/operators/GroupReduceOperator.java |   3 +-
 .../api/java/operators/IterativeDataSet.java    |   2 +-
 .../flink/api/java/operators/JoinOperator.java  |   2 +-
 .../apache/flink/api/java/operators/Keys.java   |   2 +-
 .../flink/api/java/operators/MapOperator.java   |   3 +-
 .../java/operators/MapPartitionOperator.java    |   2 +-
 .../flink/api/java/operators/Operator.java      |   2 +-
 .../api/java/operators/ProjectOperator.java     |   2 +-
 .../api/java/operators/ReduceOperator.java      |   3 +-
 .../api/java/operators/SingleInputOperator.java |   3 +-
 .../java/operators/SingleInputUdfOperator.java  |   3 +-
 .../api/java/operators/SortedGrouping.java      |   3 +-
 .../api/java/operators/TwoInputOperator.java    |   3 +-
 .../api/java/operators/TwoInputUdfOperator.java |   3 +-
 .../api/java/operators/UnsortedGrouping.java    |   2 +-
 .../PlanBothUnwrappingCoGroupOperator.java      |   2 +-
 .../PlanBothUnwrappingJoinOperator.java         |   2 +-
 .../translation/PlanFilterOperator.java         |   2 +-
 .../PlanLeftUnwrappingCoGroupOperator.java      |   2 +-
 .../PlanLeftUnwrappingJoinOperator.java         |   2 +-
 .../translation/PlanProjectOperator.java        |   2 +-
 .../PlanRightUnwrappingCoGroupOperator.java     |   2 +-
 .../PlanRightUnwrappingJoinOperator.java        |   2 +-
 .../PlanUnwrappingReduceGroupOperator.java      |   2 +-
 .../PlanUnwrappingReduceOperator.java           |   2 +-
 .../api/java/record/operators/FileDataSink.java |   2 +-
 .../java/record/operators/GenericDataSink.java  |   2 +-
 .../flink/api/java/typeutils/AtomicType.java    |  30 --
 .../api/java/typeutils/BasicArrayTypeInfo.java  | 132 ------
 .../flink/api/java/typeutils/BasicTypeInfo.java | 181 --------
 .../flink/api/java/typeutils/CompositeType.java |  30 --
 .../api/java/typeutils/GenericTypeInfo.java     |   3 +-
 .../java/typeutils/InputTypeConfigurable.java   |   2 +-
 .../api/java/typeutils/ObjectArrayTypeInfo.java |   7 +-
 .../flink/api/java/typeutils/PojoField.java     |   2 +-
 .../flink/api/java/typeutils/PojoTypeInfo.java  |   4 +-
 .../java/typeutils/PrimitiveArrayTypeInfo.java  | 118 ------
 .../api/java/typeutils/RecordTypeInfo.java      |   4 +-
 .../api/java/typeutils/ResultTypeQueryable.java |   2 +-
 .../flink/api/java/typeutils/TupleTypeInfo.java |   4 +-
 .../api/java/typeutils/TupleTypeInfoBase.java   |   3 +-
 .../flink/api/java/typeutils/TypeExtractor.java |   7 +-
 .../api/java/typeutils/TypeInfoParser.java      |   5 +-
 .../flink/api/java/typeutils/ValueTypeInfo.java |   5 +-
 .../api/java/typeutils/WritableTypeInfo.java    |   5 +-
 .../runtime/CopyableValueComparator.java        |   6 +-
 .../runtime/GenericArraySerializer.java         | 157 -------
 .../runtime/record/RecordComparator.java        | 422 -------------------
 .../runtime/record/RecordComparatorFactory.java | 152 -------
 .../runtime/record/RecordPairComparator.java    | 106 -----
 .../record/RecordPairComparatorFactory.java     | 105 -----
 .../runtime/record/RecordSerializer.java        | 114 -----
 .../runtime/record/RecordSerializerFactory.java |  75 ----
 .../java/functions/SelectByFunctionsTest.java   |   2 +-
 .../java/functions/SemanticPropUtilTest.java    |   5 +-
 .../SemanticPropertiesProjectionTest.java       |   3 +-
 .../apache/flink/api/java/io/CSVReaderTest.java |   5 +-
 .../api/java/io/CollectionInputFormatTest.java  |   4 +-
 .../java/operator/AggregateOperatorTest.java    |   4 +-
 .../api/java/operator/CoGroupOperatorTest.java  |   4 +-
 .../api/java/operator/CrossOperatorTest.java    |   4 +-
 .../api/java/operator/DistinctOperatorTest.java |   4 +-
 .../flink/api/java/operator/GroupingTest.java   |   2 +-
 .../api/java/operator/JoinOperatorTest.java     |   4 +-
 .../api/java/operator/MaxByOperatorTest.java    |   2 +-
 .../api/java/operator/MinByOperatorTest.java    |   2 +-
 .../java/operator/ProjectionOperatorTest.java   |   4 +-
 .../translation/ReduceTranslationTests.java     |   3 +-
 .../type/extractor/PojoTypeInformationTest.java |   2 +-
 .../TypeExtractorInputFormatsTest.java          |   4 +-
 .../java/type/extractor/TypeExtractorTest.java  |  10 +-
 .../api/java/typeutils/TypeInfoParserTest.java  |   9 +-
 .../AbstractGenericArraySerializerTest.java     |   2 +-
 .../typeutils/runtime/PojoSerializerTest.java   |   3 +-
 .../runtime/operators/CachedMatchTaskTest.java  |   4 +-
 .../operators/CoGroupTaskExternalITCase.java    |   4 +-
 .../runtime/operators/CoGroupTaskTest.java      |   4 +-
 .../operators/CombineTaskExternalITCase.java    |   3 +-
 .../runtime/operators/CombineTaskTest.java      |   3 +-
 .../runtime/operators/DataSinkTaskTest.java     |   3 +-
 .../operators/MatchTaskExternalITCase.java      |   5 +-
 .../flink/runtime/operators/MatchTaskTest.java  |   4 +-
 .../operators/ReduceTaskExternalITCase.java     |   5 +-
 .../flink/runtime/operators/ReduceTaskTest.java |   5 +-
 .../operators/chaining/ChainTaskTest.java       |   4 +-
 .../drivers/AllGroupReduceDriverTest.java       |   2 +-
 .../operators/drivers/AllReduceDriverTest.java  |   2 +-
 .../operators/hash/HashMatchIteratorITCase.java |   6 +-
 .../runtime/operators/hash/HashTableITCase.java |   5 +-
 .../hash/ReOpenableHashTableITCase.java         |   6 +-
 .../resettable/BlockResettableIteratorTest.java |   3 +-
 ...lockResettableMutableObjectIteratorTest.java |   3 +-
 ...lingResettableMutableObjectIteratorTest.java |   3 +-
 .../CombiningUnilateralSortMergerITCase.java    |   5 +-
 .../operators/sort/ExternalSortITCase.java      |   5 +-
 .../operators/sort/MergeIteratorTest.java       |   4 +-
 .../operators/sort/NormalizedKeySorterTest.java |   4 +-
 .../sort/SortMergeCoGroupIteratorITCase.java    |   6 +-
 .../sort/SortMergeMatchIteratorITCase.java      |   6 +-
 .../operators/testutils/DriverTestBase.java     |   5 +-
 .../operators/testutils/TaskTestBase.java       |   2 +-
 .../operators/util/HashVsSortMiniBenchmark.java |   6 +-
 .../operators/util/OutputEmitterTest.java       |   4 +-
 .../operators/util/RecordOutputEmitterTest.java |   2 +-
 .../runtime/util/KeyGroupedIteratorTest.java    |   4 +-
 .../scala/operators/ScalaCsvInputFormat.java    |   3 +-
 .../scala/operators/ScalaCsvOutputFormat.java   |   4 +-
 .../org/apache/flink/api/scala/DataSet.scala    |   2 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |   6 +-
 .../apache/flink/api/scala/GroupedDataSet.scala |   2 +-
 .../apache/flink/api/scala/coGroupDataSet.scala |   2 +-
 .../api/scala/codegen/TypeInformationGen.scala  |   8 +-
 .../apache/flink/api/scala/crossDataSet.scala   |   2 +-
 .../apache/flink/api/scala/joinDataSet.scala    |   2 +-
 .../org/apache/flink/api/scala/package.scala    |   2 +-
 .../api/scala/typeutils/CaseClassTypeInfo.scala |   5 +-
 .../flink/api/scala/typeutils/TypeUtils.scala   |   2 +-
 .../api/scala/unfinishedKeyPairOperation.scala  |   2 +-
 .../SemanticPropertiesTranslationTest.scala     |   3 +-
 .../scala/io/CollectionInputFormatTest.scala    |   2 +-
 .../translation/ReduceTranslationTest.scala     |   3 +-
 .../api/scala/runtime/TupleSerializerTest.scala |   2 +-
 .../scala/types/TypeInformationGenTest.scala    |   1 +
 .../BroadcastVarsNepheleITCase.java             |   2 +-
 .../KMeansIterativeNepheleITCase.java           |   4 +-
 .../ConnectedComponentsNepheleITCase.java       |   6 +-
 .../IterationWithChainingNepheleITCase.java     |   4 +-
 .../CompensatableDanglingPageRank.java          |   6 +-
 .../util/CollectionDataSets.java                |   3 +-
 194 files changed, 2009 insertions(+), 2029 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index a85eb22..d962ce3 100644
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -31,11 +31,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.avro.FSDataInputStreamWrapper;
 import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.InstantiationUtil;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
index efe45fd..4db9c8a 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
@@ -20,12 +20,12 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.TypeInformation;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
index f1edc26..935ec9f 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
@@ -26,11 +26,11 @@ import java.util.ArrayList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -43,7 +43,6 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
-import org.apache.flink.types.TypeInformation;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
index d9da2b4..070d097 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
@@ -27,11 +27,11 @@ import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -43,7 +43,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
 import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit;
-import org.apache.flink.types.TypeInformation;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
index 7d0c5e8..7238e94 100644
--- a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.api.java.io.jdbc.example;
 
-import static org.apache.flink.api.java.typeutils.BasicTypeInfo.DOUBLE_TYPE_INFO;
-import static org.apache.flink.api.java.typeutils.BasicTypeInfo.INT_TYPE_INFO;
-import static org.apache.flink.api.java.typeutils.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 
 import java.sql.Connection;
 import java.sql.DriverManager;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index 37cc549..8dff287 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -25,11 +25,11 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.Validate;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,7 +38,6 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 837265e..238860f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -44,7 +45,6 @@ import org.apache.flink.streaming.api.streamcomponent.StreamTask;
 import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
-import org.apache.flink.types.TypeInformation;
 
 /**
  * Object for building Apache Flink stream processing job graphs

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index c2a4c21..d2e108c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -31,7 +32,6 @@ import org.apache.flink.streaming.api.streamcomponent.StreamComponentException;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
-import org.apache.flink.types.TypeInformation;
 
 public class StreamConfig {
 	private static final String INPUT_TYPE = "inputType_";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
index 2565ce1..962663a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
@@ -31,7 +32,6 @@ import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupedBatchReduceInvokable;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.types.TypeInformation;
 
 /**
  * A {@link BatchedDataStream} represents a data stream whose elements are

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 6108eec..dc799f9 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
@@ -35,7 +36,6 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoStreamReduceInvokable;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
-import org.apache.flink.types.TypeInformation;
 
 /**
  * The ConnectedDataStream represents a stream for two different data types. It

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 970415b..cb76ba0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -63,7 +64,6 @@ import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
-import org.apache.flink.types.TypeInformation;
 
 /**
  * A DataStream represents a stream of elements of the same type. A DataStream

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 838f228..9a64bf4 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.api.datastream;
 
 import java.util.Arrays;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.types.TypeInformation;
 
 /**
  * The SplitDataStream represents an operator that has been split using an

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index ce2b7e8..17d2d6d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.streamcomponent;
 
 import java.util.ArrayList;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.io.network.api.MutableRecordReader;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
@@ -27,7 +28,6 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
 import org.apache.flink.streaming.io.CoRecordReader;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.MutableObjectIterator;
 
 public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java
index b1587da..97ce322 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.streamcomponent;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
@@ -27,7 +28,6 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.MutableObjectIterator;
 
 public class InputHandler<IN> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
index 7382d7d..721ef0e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
@@ -32,7 +33,6 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.StreamRecordWriter;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.types.TypeInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index 6e2fc0a..d313c11 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -20,10 +20,10 @@ package org.apache.flink.streaming.api.streamrecord;
 
 import java.io.IOException;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.TypeInformation;
 
 public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
index 69d596e..8408fc1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.util.serialization;
 
 import java.io.Serializable;
 
-import org.apache.flink.types.TypeInformation;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 public abstract class TypeSerializerWrapper<T>
 		implements Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
index 2215fcd..24f46aa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
@@ -23,13 +23,13 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
 public class MockCoInvokable<IN1, IN2, OUT> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
index 47a64ac..469d6c5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
@@ -23,11 +23,11 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
index 98bdc23..64d0945 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
@@ -23,13 +23,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.operators.OperatorDescriptorDual;
 import org.apache.flink.compiler.operators.UtilSinkJoinOpDescriptor;
 import org.apache.flink.compiler.util.NoOpBinaryUdfOp;
 import org.apache.flink.types.Nothing;
-import org.apache.flink.types.NothingTypeInfo;
 
 /**
  * This class represents a utility node that is not part of the actual plan. It is used for plans with multiple data sinks to

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
index b1ad630..b344ee7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
@@ -27,6 +27,7 @@ import java.util.Set;
 
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.PactCompiler.InterestingPropertyVisitor;
@@ -53,7 +54,6 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.types.Nothing;
-import org.apache.flink.types.NothingTypeInfo;
 import org.apache.flink.util.Visitor;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
index 425e4ac..a7ace1a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
@@ -30,6 +30,9 @@ import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.CompositeType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
@@ -37,8 +40,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.AtomicType;
-import org.apache.flink.api.java.typeutils.CompositeType;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeComparatorFactory;
 import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
@@ -60,7 +61,6 @@ import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
 import org.apache.flink.compiler.plan.WorksetPlanNode;
 import org.apache.flink.compiler.util.NoOpUnaryUdfOp;
 import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.types.TypeInformation;
 
 /**
  * The post-optimizer plan traversal. This traversal fills in the API specific utilities (serializers and

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/RecordModelPostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/RecordModelPostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/RecordModelPostPass.java
index 84fc877..93bbc07 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/RecordModelPostPass.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/RecordModelPostPass.java
@@ -27,9 +27,9 @@ import org.apache.flink.api.common.operators.base.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.record.RecordComparatorFactory;
-import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
+import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.CompilerPostPassException;
 import org.apache.flink.compiler.plan.DualInputPlanNode;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
index 9980248..2272176 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.RecordOperator;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Key;
-import org.apache.flink.types.TypeInformation;
 
 
 public class NoOpBinaryUdfOp<OUT> extends DualInputOperator<OUT, OUT, OUT, NoOpFunction> implements RecordOperator {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
index e2d688a..e3fe15e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.operators.RecordOperator;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Key;
-import org.apache.flink.types.TypeInformation;
 
 
 public class NoOpUnaryUdfOp<OUT> extends SingleInputOperator<OUT, OUT, NoOpFunction> implements RecordOperator {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
index 0fbf072..2756151 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.compiler.dag.DataSourceNode;
 import org.apache.flink.compiler.dag.MapNode;
 import org.apache.flink.compiler.dag.MatchNode;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-compiler/src/test/java/org/apache/flink/compiler/plan/ChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/plan/ChannelTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/plan/ChannelTest.java
index 46dff18..1898783 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/plan/ChannelTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/plan/ChannelTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.compiler.plan;
 
 import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
-import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.compiler.dag.DataSourceNode;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.SourcePlanNode;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java
new file mode 100644
index 0000000..4c93027
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/InvalidTypesException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.InvalidProgramException;
+
+/**
+ * A special case of the {@link InvalidProgramException}, indicating that the types used in
+ * an operation are invalid or inconsistent. 
+ */
+public class InvalidTypesException extends InvalidProgramException {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates a new exception with no message.
+	 */
+	public InvalidTypesException() {
+		super();
+	}
+
+	/**
+	 * Creates a new exception with the given message.
+	 * 
+	 * @param message The exception message.
+	 */
+	public InvalidTypesException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
index 6c7edff..f950dad 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
@@ -50,7 +50,7 @@ public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunct
 	 * This method is only ever invoked when the subclass of {@link RichGroupReduceFunction}
 	 * adds the {@link Combinable} annotation, or if the <i>combinable</i> flag is set when defining
 	 * the <i>reduceGroup<i> operation via
-	 * {@link org.apache.flink.api.java.operators.GroupReduceOperator#setCombinable(boolean)}.
+	 * org.apache.flink.api.java.operators.GroupReduceOperator#setCombinable(boolean).
 	 * <p>
 	 * Since the reduce function will be called on the result of this method, it is important that this
 	 * method returns the same data type as it consumes. By default, this method only calls the

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/operators/BinaryOperatorInformation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/BinaryOperatorInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/BinaryOperatorInformation.java
index ca1693a..6fc6098 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/BinaryOperatorInformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/BinaryOperatorInformation.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators;
 
-import org.apache.flink.types.TypeInformation;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
  *  A class for holding information about a single input operator, such as input/output TypeInformation.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index 4bcfde2..5807af3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -22,8 +22,8 @@ package org.apache.flink.api.common.operators;
 import java.util.List;
 
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Visitable;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/operators/OperatorInformation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/OperatorInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/OperatorInformation.java
index 004bed7..b86b9a0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/OperatorInformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/OperatorInformation.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators;
 
-import org.apache.flink.types.TypeInformation;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
  *  A class for holding information about an operator, such as input/output TypeInformation.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/operators/UnaryOperatorInformation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/UnaryOperatorInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/UnaryOperatorInformation.java
index 4ee36d5..be3b75c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/UnaryOperatorInformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/UnaryOperatorInformation.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators;
 
-import org.apache.flink.types.TypeInformation;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
  *  A class for holding information about a single input operator, such as input/output TypeInformation.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index 81c6679..196cbeb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -38,10 +38,10 @@ import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.Nothing;
-import org.apache.flink.types.NothingTypeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Visitor;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java
new file mode 100644
index 0000000..d806b32
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+
+
+/**
+ *
+ */
+public interface AtomicType<T> {
+	
+	TypeComparator<T> createComparator(boolean sortOrderAscending);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
new file mode 100644
index 0000000..3b7f305
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
+
+public class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
+
+	public static final BasicArrayTypeInfo<String[], String> STRING_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<String[], String>(String[].class, BasicTypeInfo.STRING_TYPE_INFO);
+	
+	public static final BasicArrayTypeInfo<Boolean[], Boolean> BOOLEAN_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Boolean[], Boolean>(Boolean[].class, BasicTypeInfo.BOOLEAN_TYPE_INFO);
+	public static final BasicArrayTypeInfo<Byte[], Byte> BYTE_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Byte[], Byte>(Byte[].class, BasicTypeInfo.BYTE_TYPE_INFO);
+	public static final BasicArrayTypeInfo<Short[], Short> SHORT_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Short[], Short>(Short[].class, BasicTypeInfo.SHORT_TYPE_INFO);
+	public static final BasicArrayTypeInfo<Integer[], Integer> INT_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Integer[], Integer>(Integer[].class, BasicTypeInfo.INT_TYPE_INFO);
+	public static final BasicArrayTypeInfo<Long[], Long> LONG_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Long[], Long>(Long[].class, BasicTypeInfo.LONG_TYPE_INFO);
+	public static final BasicArrayTypeInfo<Float[], Float> FLOAT_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Float[], Float>(Float[].class, BasicTypeInfo.FLOAT_TYPE_INFO);
+	public static final BasicArrayTypeInfo<Double[], Double> DOUBLE_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Double[], Double>(Double[].class, BasicTypeInfo.DOUBLE_TYPE_INFO);
+	public static final BasicArrayTypeInfo<Character[], Character> CHAR_ARRAY_TYPE_INFO = new BasicArrayTypeInfo<Character[], Character>(Character[].class, BasicTypeInfo.CHAR_TYPE_INFO);
+	
+	// --------------------------------------------------------------------------------------------
+
+	private final Class<T> arrayClass;
+	private final Class<C> componentClass;
+	private final TypeInformation<C> componentInfo;
+
+	@SuppressWarnings("unchecked")
+	private BasicArrayTypeInfo(Class<T> arrayClass, BasicTypeInfo<C> componentInfo) {
+		this.arrayClass = arrayClass;
+		this.componentClass = (Class<C>) arrayClass.getComponentType();
+		this.componentInfo = componentInfo;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 1;
+	}
+
+	@Override
+	public Class<T> getTypeClass() {
+		return this.arrayClass;
+	}
+
+	public Class<C> getComponentTypeClass() {
+		return this.componentClass;
+	}
+	
+	public TypeInformation<C> getComponentInfo() {
+		return componentInfo;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return false;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public TypeSerializer<T> createSerializer() {
+		// special case the string array
+		if (componentClass.equals(String.class)) {
+			return (TypeSerializer<T>) StringArraySerializer.INSTANCE;
+		} else {
+			return (TypeSerializer<T>) new GenericArraySerializer<C>(this.componentClass, this.componentInfo.createSerializer());
+		}
+	}
+	
+	@Override
+	public String toString() {
+		return this.getClass().getSimpleName()+"<"+this.componentInfo+">";
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	public static <X, C> BasicArrayTypeInfo<X, C> getInfoFor(Class<X> type) {
+		if (!type.isArray()) {
+			throw new InvalidTypesException("The given class is no array.");
+		}
+
+		// basic type arrays
+		return (BasicArrayTypeInfo<X, C>) TYPES.get(type);
+	}
+
+	private static final Map<Class<?>, BasicArrayTypeInfo<?, ?>> TYPES = new HashMap<Class<?>, BasicArrayTypeInfo<?, ?>>();
+
+	static {
+		TYPES.put(String[].class, STRING_ARRAY_TYPE_INFO);
+		TYPES.put(Boolean[].class, BOOLEAN_ARRAY_TYPE_INFO);
+		TYPES.put(Byte[].class, BYTE_ARRAY_TYPE_INFO);
+		TYPES.put(Short[].class, SHORT_ARRAY_TYPE_INFO);
+		TYPES.put(Integer[].class, INT_ARRAY_TYPE_INFO);
+		TYPES.put(Long[].class, LONG_ARRAY_TYPE_INFO);
+		TYPES.put(Float[].class, FLOAT_ARRAY_TYPE_INFO);
+		TYPES.put(Double[].class, DOUBLE_ARRAY_TYPE_INFO);
+		TYPES.put(Character[].class, CHAR_ARRAY_TYPE_INFO);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
new file mode 100644
index 0000000..789ed7e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanComparator;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteComparator;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.CharComparator;
+import org.apache.flink.api.common.typeutils.base.CharSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatComparator;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortComparator;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+
+/**
+ *
+ */
+public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
+
+	public static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<String>(String.class, StringSerializer.INSTANCE, StringComparator.class);
+	public static final BasicTypeInfo<Boolean> BOOLEAN_TYPE_INFO = new BasicTypeInfo<Boolean>(Boolean.class, BooleanSerializer.INSTANCE, BooleanComparator.class);
+	public static final BasicTypeInfo<Byte> BYTE_TYPE_INFO = new BasicTypeInfo<Byte>(Byte.class, ByteSerializer.INSTANCE, ByteComparator.class);
+	public static final BasicTypeInfo<Short> SHORT_TYPE_INFO = new BasicTypeInfo<Short>(Short.class, ShortSerializer.INSTANCE, ShortComparator.class);
+	public static final BasicTypeInfo<Integer> INT_TYPE_INFO = new BasicTypeInfo<Integer>(Integer.class, IntSerializer.INSTANCE, IntComparator.class);
+	public static final BasicTypeInfo<Long> LONG_TYPE_INFO = new BasicTypeInfo<Long>(Long.class, LongSerializer.INSTANCE, LongComparator.class);
+	public static final BasicTypeInfo<Float> FLOAT_TYPE_INFO = new BasicTypeInfo<Float>(Float.class, FloatSerializer.INSTANCE, FloatComparator.class);
+	public static final BasicTypeInfo<Double> DOUBLE_TYPE_INFO = new BasicTypeInfo<Double>(Double.class, DoubleSerializer.INSTANCE, DoubleComparator.class);
+	public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<Character>(Character.class, CharSerializer.INSTANCE, CharComparator.class);
+	
+	// --------------------------------------------------------------------------------------------
+
+	private final Class<T> clazz;
+	
+	private final TypeSerializer<T> serializer;
+	
+	private final Class<? extends TypeComparator<T>> comparatorClass;
+	
+	
+	private BasicTypeInfo(Class<T> clazz, TypeSerializer<T> serializer, Class<? extends TypeComparator<T>> comparatorClass) {
+		this.clazz = clazz;
+		this.serializer = serializer;
+		this.comparatorClass = comparatorClass;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean isBasicType() {
+		return true;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 1;
+	}
+
+	@Override
+	public Class<T> getTypeClass() {
+		return this.clazz;
+	}
+	
+	@Override
+	public boolean isKeyType() {
+		return true;
+	}
+	
+	@Override
+	public TypeSerializer<T> createSerializer() {
+		return this.serializer;
+	}
+	
+	@Override
+	public TypeComparator<T> createComparator(boolean sortOrderAscending) {
+		return instantiateComparator(comparatorClass, sortOrderAscending);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return this.clazz.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof BasicTypeInfo) {
+			@SuppressWarnings("unchecked")
+			BasicTypeInfo<T> other = (BasicTypeInfo<T>) obj;
+			return this.clazz.equals(other.clazz);
+		} else {
+			return false;
+		}
+	}
+	
+	@Override
+	public String toString() {
+		return clazz.getSimpleName();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static <X> BasicTypeInfo<X> getInfoFor(Class<X> type) {
+		if (type == null) {
+			throw new NullPointerException();
+		}
+		
+		@SuppressWarnings("unchecked")
+		BasicTypeInfo<X> info = (BasicTypeInfo<X>) TYPES.get(type);
+		return info;
+	}
+	
+	private static <X> TypeComparator<X> instantiateComparator(Class<? extends TypeComparator<X>> comparatorClass, boolean ascendingOrder) {
+		try {
+			Constructor<? extends TypeComparator<X>> constructor = comparatorClass.getConstructor(boolean.class);
+			return constructor.newInstance(ascendingOrder);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not initialize basic comparator " + comparatorClass.getName(), e);
+		}
+	}
+	
+	private static final Map<Class<?>, BasicTypeInfo<?>> TYPES = new HashMap<Class<?>, BasicTypeInfo<?>>();
+	
+	static {
+		TYPES.put(String.class, STRING_TYPE_INFO);
+		TYPES.put(Boolean.class, BOOLEAN_TYPE_INFO);
+		TYPES.put(boolean.class, BOOLEAN_TYPE_INFO);
+		TYPES.put(Byte.class, BYTE_TYPE_INFO);
+		TYPES.put(byte.class, BYTE_TYPE_INFO);
+		TYPES.put(Short.class, SHORT_TYPE_INFO);
+		TYPES.put(short.class, SHORT_TYPE_INFO);
+		TYPES.put(Integer.class, INT_TYPE_INFO);
+		TYPES.put(int.class, INT_TYPE_INFO);
+		TYPES.put(Long.class, LONG_TYPE_INFO);
+		TYPES.put(long.class, LONG_TYPE_INFO);
+		TYPES.put(Float.class, FLOAT_TYPE_INFO);
+		TYPES.put(float.class, FLOAT_TYPE_INFO);
+		TYPES.put(Double.class, DOUBLE_TYPE_INFO);
+		TYPES.put(double.class, DOUBLE_TYPE_INFO);
+		TYPES.put(Character.class, CHAR_TYPE_INFO);
+		TYPES.put(char.class, CHAR_TYPE_INFO);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/CompositeType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/CompositeType.java
new file mode 100644
index 0000000..b4f6b87
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/CompositeType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+
+
+/**
+ *
+ */
+public interface CompositeType<T> {
+
+	TypeComparator<T> createComparator(int[] logicalKeyFields, boolean[] orders);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
new file mode 100644
index 0000000..fe0a93e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
@@ -0,0 +1,55 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.Nothing;
+
+public class NothingTypeInfo extends TypeInformation<Nothing> {
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 0;
+	}
+
+	@Override
+	public Class<Nothing> getTypeClass() {
+		return Nothing.class;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<Nothing> createSerializer() {
+		throw new RuntimeException("The Nothing type cannot have a serializer.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
new file mode 100644
index 0000000..81ad1aa
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.DoublePrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
+
+public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
+
+	public static final PrimitiveArrayTypeInfo<boolean[]> BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<boolean[]>(boolean[].class, BooleanPrimitiveArraySerializer.INSTANCE);
+	public static final PrimitiveArrayTypeInfo<byte[]> BYTE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<byte[]>(byte[].class, BytePrimitiveArraySerializer.INSTANCE);
+	public static final PrimitiveArrayTypeInfo<short[]> SHORT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<short[]>(short[].class, ShortPrimitiveArraySerializer.INSTANCE);
+	public static final PrimitiveArrayTypeInfo<int[]> INT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<int[]>(int[].class, IntPrimitiveArraySerializer.INSTANCE);
+	public static final PrimitiveArrayTypeInfo<long[]> LONG_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<long[]>(long[].class, LongPrimitiveArraySerializer.INSTANCE);
+	public static final PrimitiveArrayTypeInfo<float[]> FLOAT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<float[]>(float[].class, FloatPrimitiveArraySerializer.INSTANCE);
+	public static final PrimitiveArrayTypeInfo<double[]> DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<double[]>(double[].class, DoublePrimitiveArraySerializer.INSTANCE);
+	public static final PrimitiveArrayTypeInfo<char[]> CHAR_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<char[]>(char[].class, CharPrimitiveArraySerializer.INSTANCE);
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private final Class<T> arrayClass;
+	private final TypeSerializer<T> serializer;
+
+	private PrimitiveArrayTypeInfo(Class<T> arrayClass, TypeSerializer<T> serializer) {
+		this.arrayClass = arrayClass;
+		this.serializer = serializer;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 1;
+	}
+
+	@Override
+	public Class<T> getTypeClass() {
+		return this.arrayClass;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<T> createSerializer() {
+		return this.serializer;
+	}
+	
+	@Override
+	public String toString() {
+		return arrayClass.getComponentType().getName() + "[]";
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	public static <X> PrimitiveArrayTypeInfo<X> getInfoFor(Class<X> type) {
+		if (!type.isArray()) {
+			throw new InvalidTypesException("The given class is no array.");
+		}
+
+		// basic type arrays
+		return (PrimitiveArrayTypeInfo<X>) TYPES.get(type);
+	}
+
+	private static final Map<Class<?>, PrimitiveArrayTypeInfo<?>> TYPES = new HashMap<Class<?>, PrimitiveArrayTypeInfo<?>>();
+
+	static {
+		TYPES.put(boolean[].class, BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO);
+		TYPES.put(byte[].class, BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
+		TYPES.put(short[].class, SHORT_PRIMITIVE_ARRAY_TYPE_INFO);
+		TYPES.put(int[].class, INT_PRIMITIVE_ARRAY_TYPE_INFO);
+		TYPES.put(long[].class, LONG_PRIMITIVE_ARRAY_TYPE_INFO);
+		TYPES.put(float[].class, FLOAT_PRIMITIVE_ARRAY_TYPE_INFO);
+		TYPES.put(double[].class, DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO);
+		TYPES.put(char[].class, CHAR_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e649d71d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
new file mode 100644
index 0000000..85275c3
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public abstract class TypeInformation<T> {
+	
+	public abstract boolean isBasicType();
+	
+	public abstract boolean isTupleType();
+	
+	public abstract int getArity();
+	
+	public abstract Class<T> getTypeClass();
+	
+	public abstract boolean isKeyType();
+	
+	public abstract TypeSerializer<T> createSerializer();
+}


Mime
View raw message