flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [2/5] flink git commit: [FLINK-3093] Introduce annotations for interface stability in remaining modules
Date Fri, 05 Feb 2016 13:44:27 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/PartitionSortedDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/PartitionSortedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/PartitionSortedDataSet.scala
index 44d1c4c..c924a76 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/PartitionSortedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/PartitionSortedDataSet.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala
 
+import org.apache.flink.annotation.Public
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.java.operators.SortPartitionOperator
 
@@ -28,6 +29,7 @@ import scala.reflect.ClassTag
  *
  * @tparam T The type of the DataSet, i.e., the type of the elements of the DataSet.
  */
+@Public
 class PartitionSortedDataSet[T: ClassTag](set: SortPartitionOperator[T])
   extends DataSet[T](set) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
index ace0790..0fd0298 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/UnfinishedCoGroupOperation.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala
 
+import org.apache.flink.annotation.Public
 import org.apache.flink.api.common.functions.CoGroupFunction
 import org.apache.flink.api.common.operators.Keys
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -30,17 +31,18 @@ import scala.reflect.ClassTag
 
 /**
  * An unfinished coGroup operation that results from [[DataSet.coGroup]] The keys for the left and
- * right side must be specified using first `where` and then `isEqualTo`. For example:
+ * right side must be specified using first `where` and then `equalTo`. For example:
  *
  * {{{
  *   val left = ...
  *   val right = ...
- *   val coGroupResult = left.coGroup(right).where(...).isEqualTo(...)
+ *   val coGroupResult = left.coGroup(right).where(...).equalTo(...)
  * }}}
  *
  * @tparam L The type of the left input of the coGroup.
  * @tparam R The type of the right input of the coGroup.
  */
+@Public
 class UnfinishedCoGroupOperation[L: ClassTag, R: ClassTag](
                                                             leftInput: DataSet[L],
                                                             rightInput: DataSet[R])

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
index 6aa4e08..9b614fd 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
@@ -17,9 +17,11 @@
  */
 package org.apache.flink.api.scala.hadoop.mapred
 
+import org.apache.flink.annotation.Public
 import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase
 import org.apache.hadoop.mapred.{JobConf, InputFormat}
 
+@Public
 class HadoopInputFormat[K, V](
     mapredInputFormat: InputFormat[K, V],
     keyClass: Class[K],

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
index 68b4922..ad5f282 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
@@ -17,9 +17,11 @@
  */
 package org.apache.flink.api.scala.hadoop.mapred
 
+import org.apache.flink.annotation.Public
 import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
 import org.apache.hadoop.mapred.{OutputCommitter, JobConf, OutputFormat}
 
+@Public
 class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf)
   extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
index 6de658a..8efb53d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
@@ -18,9 +18,11 @@
 
 package org.apache.flink.api.scala.hadoop.mapreduce
 
+import org.apache.flink.annotation.Public
 import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase
 import org.apache.hadoop.mapreduce.{InputFormat, Job}
 
+@Public
 class HadoopInputFormat[K, V](
     mapredInputFormat: InputFormat[K, V],
     keyClass: Class[K],

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
index b8ba3c1..8095304 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
@@ -18,9 +18,11 @@
 
 package org.apache.flink.api.scala.hadoop.mapreduce
 
+import org.apache.flink.annotation.Public
 import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase
 import org.apache.hadoop.mapreduce.{Job, OutputFormat}
 
+@Public
 class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: Job)
   extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index 71f2bfb..ec5f366 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala
 
+import org.apache.flink.annotation.{Internal, Public}
 import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction, Partitioner, RichFlatJoinFunction}
 import org.apache.flink.api.common.operators.Keys
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
@@ -56,6 +57,7 @@ import scala.reflect.ClassTag
  * @tparam L Type of the left input of the join.
  * @tparam R Type of the right input of the join.
  */
+@Public
 class JoinDataSet[L, R](
     defaultJoin: EquiJoin[L, R, (L, R)],
     leftInput: DataSet[L],
@@ -206,6 +208,7 @@ class JoinDataSet[L, R](
   /**
    * Gets the custom partitioner used by this join, or null, if none is set.
    */
+  @Internal
   def getPartitioner[K]() : Partitioner[K] = {
     customPartitioner.asInstanceOf[Partitioner[K]]
   }
@@ -261,6 +264,7 @@ private[flink] abstract class UnfinishedJoinOperationBase[L, R, O <: JoinFunctio
  * @tparam L The type of the left input of the join.
  * @tparam R The type of the right input of the join.
  */
+@Public
 class UnfinishedJoinOperation[L, R](
     leftSet: DataSet[L],
     rightSet: DataSet[R],
@@ -268,6 +272,7 @@ class UnfinishedJoinOperation[L, R](
   extends UnfinishedJoinOperationBase[L, R, JoinDataSet[L, R]](
     leftSet, rightSet, joinHint, JoinType.INNER) {
 
+  @Internal
   override def createJoinFunctionAssigner(leftKey: Keys[L], rightKey: Keys[R]) = {
     createDefaultJoin(leftKey, rightKey)
   }
@@ -293,6 +298,7 @@ class UnfinishedJoinOperation[L, R](
  * @tparam L The type of the left input of the join.
  * @tparam R The type of the right input of the join.
  */
+@Public
 class UnfinishedOuterJoinOperation[L, R](
     leftSet: DataSet[L],
     rightSet: DataSet[R],
@@ -301,6 +307,7 @@ class UnfinishedOuterJoinOperation[L, R](
   extends UnfinishedJoinOperationBase[L, R, JoinFunctionAssigner[L, R]](
     leftSet, rightSet, joinHint, joinType) {
 
+  @Internal
   override def createJoinFunctionAssigner(leftKey: Keys[L], rightKey: Keys[R]):
       JoinFunctionAssigner[L, R] = {
     new DefaultJoinFunctionAssigner(createDefaultJoin(leftKey, rightKey))
@@ -327,6 +334,7 @@ class UnfinishedOuterJoinOperation[L, R](
 
 }
 
+@Public
 trait JoinFunctionAssigner[L, R] {
 
   def withPartitioner[K : TypeInformation](part : Partitioner[K]) : JoinFunctionAssigner[L, R]

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index e8bc3a4..eb41b4b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.scala.typeutils
 import java.util
 import java.util.regex.{Pattern, Matcher}
 
+import org.apache.flink.annotation.{Experimental, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.operators.Keys
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -37,6 +38,7 @@ import scala.collection.mutable.ArrayBuffer
  * TypeInformation for Case Classes. Creation and access is different from
  * our Java Tuples so we have to treat them differently.
  */
+@Public
 abstract class CaseClassTypeInfo[T <: Product](
     clazz: Class[T],
     val typeParamTypeInfos: Array[TypeInformation[_]],
@@ -44,6 +46,7 @@ abstract class CaseClassTypeInfo[T <: Product](
     val fieldNames: Seq[String])
   extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) {
 
+  @Experimental
   override def getGenericParameters: java.util.List[TypeInformation[_]] = {
     typeParamTypeInfos.toList.asJava
   }
@@ -60,10 +63,12 @@ abstract class CaseClassTypeInfo[T <: Product](
     Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD)
   private val PATTERN_INT_FIELD: Pattern = Pattern.compile(REGEX_INT_FIELD)
 
+  @Experimental
   def getFieldIndices(fields: Array[String]): Array[Int] = {
     fields map { x => fieldNames.indexOf(x) }
   }
 
+  @Experimental
   override def getFlatFields(
       fieldExpression: String,
       offset: Int,
@@ -145,6 +150,7 @@ abstract class CaseClassTypeInfo[T <: Product](
     }
   }
 
+  @Experimental
   override def getTypeAt[X](fieldExpression: String) : TypeInformation[X] = {
 
     val matcher: Matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression)
@@ -187,8 +193,10 @@ abstract class CaseClassTypeInfo[T <: Product](
       "\" in type " + this + ".")
   }
 
+  @Experimental
   override def getFieldNames: Array[String] = fieldNames.toArray
 
+  @Experimental
   override def getFieldIndex(fieldName: String): Int = {
     val result = fieldNames.indexOf(fieldName)
     if (result != fieldNames.lastIndexOf(fieldName)) {
@@ -198,6 +206,7 @@ abstract class CaseClassTypeInfo[T <: Product](
     }
   }
 
+  @Experimental
   override def createTypeComparatorBuilder(): TypeComparatorBuilder[T] = {
     new CaseClassTypeComparatorBuilder
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
index 2beebde..cb39e7b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.flink.annotation.{Experimental, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
@@ -26,20 +27,29 @@ import scala.collection.JavaConverters._
 /**
  * TypeInformation [[Either]].
  */
+@Public
 class EitherTypeInfo[A, B, T <: Either[A, B]](
     val clazz: Class[T],
     val leftTypeInfo: TypeInformation[A],
     val rightTypeInfo: TypeInformation[B])
   extends TypeInformation[T] {
 
+  @Experimental
   override def isBasicType: Boolean = false
+  @Experimental
   override def isTupleType: Boolean = false
+  @Experimental
   override def isKeyType: Boolean = false
+  @Experimental
   override def getTotalFields: Int = 1
+  @Experimental
   override def getArity: Int = 1
+  @Experimental
   override def getTypeClass = clazz
+  @Experimental
   override def getGenericParameters = List[TypeInformation[_]](leftTypeInfo, rightTypeInfo).asJava
 
+  @Experimental
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     val leftSerializer = if (leftTypeInfo != null) {
       leftTypeInfo.createSerializer(executionConfig)

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
index e3d665e..79a2866 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.flink.annotation.{Experimental, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
@@ -26,24 +27,34 @@ import scala.collection.JavaConverters._
 /**
  * TypeInformation for [[Enumeration]] values.
  */
+@Public
 class EnumValueTypeInfo[E <: Enumeration](val enum: E, val clazz: Class[E#Value])
   extends TypeInformation[E#Value] with AtomicType[E#Value] {
 
   type T = E#Value
 
+  @Experimental
   override def isBasicType: Boolean = false
+  @Experimental
   override def isTupleType: Boolean = false
+  @Experimental
   override def isKeyType: Boolean = true
+  @Experimental
   override def getTotalFields: Int = 1
+  @Experimental
   override def getArity: Int = 1
+  @Experimental
   override def getTypeClass = clazz
+  @Experimental
   override def getGenericParameters = List.empty[TypeInformation[_]].asJava
 
 
+  @Experimental
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     new EnumValueSerializer[E](enum)
   }
 
+  @Experimental
   override def createComparator(ascOrder: Boolean, config: ExecutionConfig): TypeComparator[T] = {
     new EnumValueComparator[E](ascOrder)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 2aff2dd..df12955 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.flink.annotation.{Experimental, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
@@ -26,18 +27,27 @@ import scala.collection.JavaConverters._
 /**
  * TypeInformation for [[Option]].
  */
+@Public
 class OptionTypeInfo[A, T <: Option[A]](private val elemTypeInfo: TypeInformation[A])
   extends TypeInformation[T] {
 
+  @Experimental
   override def isBasicType: Boolean = false
+  @Experimental
   override def isTupleType: Boolean = false
+  @Experimental
   override def isKeyType: Boolean = false
+  @Experimental
   override def getTotalFields: Int = 1
+  @Experimental
   override def getArity: Int = 1
+  @Experimental
   override def getTypeClass = classOf[Option[_]].asInstanceOf[Class[T]]
+  @Experimental
   override def getGenericParameters = List[TypeInformation[_]](elemTypeInfo).asJava
 
 
+  @Experimental
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     if (elemTypeInfo == null) {
       // this happens when the type of a DataSet is None, i.e. DataSet[None]

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
index b59ecf7..b0f760a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
@@ -17,19 +17,28 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.flink.annotation.{Experimental, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 
+@Public
 class ScalaNothingTypeInfo extends TypeInformation[Nothing] {
 
+  @Experimental
   override def isBasicType: Boolean = false
+  @Experimental
   override def isTupleType: Boolean = false
+  @Experimental
   override def getArity: Int = 0
+  @Experimental
   override def getTotalFields: Int = 0
+  @Experimental
   override def getTypeClass: Class[Nothing] = classOf[Nothing]
+  @Experimental
   override def isKeyType: Boolean = false
 
+  @Experimental
   override def createSerializer(config: ExecutionConfig): TypeSerializer[Nothing] =
     (new NothingSerializer).asInstanceOf[TypeSerializer[Nothing]]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index 8948b0c..855caa9 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.flink.annotation.{Experimental, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
@@ -26,19 +27,28 @@ import scala.collection.JavaConverters._
 /**
  * TypeInformation for Scala Collections.
  */
+@Public
 abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
     val clazz: Class[T],
     val elementTypeInfo: TypeInformation[E])
   extends TypeInformation[T] {
 
+  @Experimental
   override def isBasicType: Boolean = false
+  @Experimental
   override def isTupleType: Boolean = false
+  @Experimental
   override def isKeyType: Boolean = false
+  @Experimental
   override def getTotalFields: Int = 1
+  @Experimental
   override def getArity: Int = 1
+  @Experimental
   override def getTypeClass: Class[T] = clazz
+  @Experimental
   override def getGenericParameters = List[TypeInformation[_]](elementTypeInfo).asJava
 
+  @Experimental
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T]
 
   override def equals(other: Any): Boolean = {

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
index f3f2ce2..880c636 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.flink.annotation.{Experimental, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
@@ -28,17 +29,26 @@ import scala.util.Try
 /**
  * TypeInformation for [[scala.util.Try]].
  */
+@Public
 class TryTypeInfo[A, T <: Try[A]](val elemTypeInfo: TypeInformation[A])
   extends TypeInformation[T] {
 
+  @Experimental
   override def isBasicType: Boolean = false
+  @Experimental
   override def isTupleType: Boolean = false
+  @Experimental
   override def isKeyType: Boolean = false
+  @Experimental
   override def getTotalFields: Int = 1
+  @Experimental
   override def getArity: Int = 1
+  @Experimental
   override def getTypeClass = classOf[Try[_]].asInstanceOf[Class[T]]
+  @Experimental
   override def getGenericParameters = List[TypeInformation[_]](elemTypeInfo).asJava
 
+  @Experimental
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
     if (elemTypeInfo == null) {
       // this happens when the type of a DataSet is None, i.e. DataSet[Failure]

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
index e0a226b..fa46a8a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
@@ -17,18 +17,27 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import org.apache.flink.annotation.{Experimental, Public}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 
+@Public
 class UnitTypeInfo extends TypeInformation[Unit] {
+  @Experimental
   override def isBasicType(): Boolean = false
+  @Experimental
   override def isTupleType(): Boolean = false
+  @Experimental
   override def getArity(): Int = 0
+  @Experimental
   override def getTotalFields(): Int = 0
+  @Experimental
   override def getTypeClass(): Class[Unit] = classOf[Unit]
+  @Experimental
   override def isKeyType(): Boolean = false
 
+  @Experimental
   override def createSerializer(config: ExecutionConfig): TypeSerializer[Unit] =
     (new UnitSerializer).asInstanceOf[TypeSerializer[Unit]]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
index d11cf8c..7a03053 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala
 
+import org.apache.flink.annotation.Experimental
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.Utils
 import org.apache.flink.api.java.Utils.ChecksumHashCode
@@ -27,6 +28,7 @@ import org.apache.flink.util.AbstractID
 import _root_.scala.language.implicitConversions
 import _root_.scala.reflect.ClassTag
 
+
 package object utils {
 
   /**
@@ -35,6 +37,7 @@ package object utils {
    *
    * @param self Data Set
    */
+  @Experimental
   implicit class DataSetUtils[T: TypeInformation : ClassTag](val self: DataSet[T]) {
 
     /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
index 152c75a..174707c 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.fs;
 
+
 /**
  * A clock that can provide the current time.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
index 2bab8cf..41663df 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.fs;
 
+
 /**
  * A {@link Clock} that uses {@code System.currentTimeMillis()} to determine the system time.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
index db46d00..a879f17 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api;
 
+import org.apache.flink.annotation.Public;
+
 /**
  * The checkpointing mode defines what consistency guarantees the system gives in the presence of
  * failures.
@@ -29,6 +31,7 @@ package org.apache.flink.streaming.api;
  * in a simpler fashion that typically encounteres some duplicates upon recovery
  * ({@link #AT_LEAST_ONCE})</p> 
  */
+@Public
 public enum CheckpointingMode {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
index 125ca65..270e2d6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api;
 
+
 /**
  * The time characteristic defines how the system determines time for time-dependent
  * order and operations that depend on time (such as time windows).

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
index 4bd89c4..ae02915 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.checkpoint;
 
+
 import java.io.Serializable;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 8cef5ea..b7dc795 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -66,6 +68,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggrega
  * @param <T> The type of elements in the stream.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
  */
+@Public
 public class AllWindowedStream<T, W extends Window> {
 
 	/** The data stream that is windowed by this stream */
@@ -81,6 +84,7 @@ public class AllWindowedStream<T, W extends Window> {
 	private Evictor<? super T, ? super W> evictor;
 
 
+	@Experimental
 	public AllWindowedStream(DataStream<T> input,
 			WindowAssigner<? super T, W> windowAssigner) {
 		this.input = input;
@@ -91,6 +95,7 @@ public class AllWindowedStream<T, W extends Window> {
 	/**
 	 * Sets the {@code Trigger} that should be used to trigger window emission.
 	 */
+	@Experimental
 	public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
 		this.trigger = trigger;
 		return this;
@@ -103,6 +108,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * Note: When using an evictor window performance will degrade significantly, since
 	 * pre-aggregation of window results cannot be used.
 	 */
+	@Experimental
 	public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
 		this.evictor = evictor;
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 3903015..b552a26 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -66,6 +69,7 @@ import static java.util.Objects.requireNonNull;
  *     .apply(new MyCoGroupFunction());
  * } </pre>
  */
+@Public
 public class CoGroupedStreams<T1, T2> {
 
 	/** The first input stream */
@@ -100,6 +104,7 @@ public class CoGroupedStreams<T1, T2> {
 	 * 
 	 * @param <KEY> The type of the key.
 	 */
+	@Public
 	public class Where<KEY> {
 
 		private final KeySelector<T1, KEY> keySelector1;
@@ -128,6 +133,7 @@ public class CoGroupedStreams<T1, T2> {
 		/**
 		 * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
 		 */
+		@Public
 		public class EqualTo {
 
 			private final KeySelector<T2, KEY> keySelector2;
@@ -139,6 +145,7 @@ public class CoGroupedStreams<T1, T2> {
 			/**
 			 * Specifies the window on which the co-group operation works.
 			 */
+			@Experimental
 			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
 				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
 			}
@@ -156,6 +163,7 @@ public class CoGroupedStreams<T1, T2> {
 	 * @param <KEY> Type of the key. This must be the same for both inputs
 	 * @param <W> Type of {@link Window} on which the co-group operation works.
 	 */
+	@Public
 	public static class WithWindow<T1, T2, KEY, W extends Window> {
 		private final DataStream<T1> input1;
 		private final DataStream<T2> input2;
@@ -194,6 +202,7 @@ public class CoGroupedStreams<T1, T2> {
 		/**
 		 * Sets the {@code Trigger} that should be used to trigger window emission.
 		 */
+		@Experimental
 		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
 			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
 					windowAssigner, newTrigger, evictor);
@@ -206,6 +215,7 @@ public class CoGroupedStreams<T1, T2> {
 		 * Note: When using an evictor window performance will degrade significantly, since
 		 * pre-aggregation of window results cannot be used.
 		 */
+		@Experimental
 		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
 			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
 					windowAssigner, trigger, newEvictor);
@@ -273,6 +283,7 @@ public class CoGroupedStreams<T1, T2> {
 	/**
 	 * Internal class for implementing tagged union co-group.
 	 */
+	@Internal
 	public static class TaggedUnion<T1, T2> {
 		private final T1 one;
 		private final T2 two;

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 395b329..0d3064d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -37,6 +39,7 @@ import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
  * @param <IN1> Type of the first input data steam.
  * @param <IN2> Type of the second input data stream.
  */
+@Public
 public class ConnectedStreams<IN1, IN2> {
 
 	protected StreamExecutionEnvironment environment;
@@ -305,6 +308,7 @@ public class ConnectedStreams<IN1, IN2> {
 		return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
 	}
 
+	@Experimental
 	public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String functionName,
 			TypeInformation<OUT> outTypeInfo,
 			TwoInputStreamOperator<IN1, IN2, OUT> operator) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f4b6e7f..6d2a44a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -21,6 +21,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -95,6 +98,7 @@ import com.google.common.base.Preconditions;
  *
  * @param <T> The type of the elements in this Stream
  */
+@Public
 public class DataStream<T> {
 
 	protected final StreamExecutionEnvironment environment;
@@ -117,6 +121,7 @@ public class DataStream<T> {
 	 *
 	 * @return ID of the DataStream
 	 */
+	@Internal
 	public Integer getId() {
 		return transformation.getId();
 	}
@@ -408,6 +413,7 @@ public class DataStream<T> {
 	 *
 	 * @return The DataStream with shuffle partitioning set.
 	 */
+	@Experimental
 	public DataStream<T> shuffle() {
 		return setConnectionType(new ShufflePartitioner<T>());
 	}
@@ -423,6 +429,7 @@ public class DataStream<T> {
 	 *
 	 * @return The DataStream with forward partitioning set.
 	 */
+	@Experimental
 	public DataStream<T> forward() {
 		return setConnectionType(new ForwardPartitioner<T>());
 	}
@@ -450,6 +457,7 @@ public class DataStream<T> {
 	 *
 	 * @return The DataStream with shuffle partitioning set.
 	 */
+	@Experimental
 	public DataStream<T> global() {
 		return setConnectionType(new GlobalPartitioner<T>());
 	}
@@ -481,6 +489,7 @@ public class DataStream<T> {
 	 *
 	 * @return The iterative data stream created.
 	 */
+	@Experimental
 	public IterativeStream<T> iterate() {
 		return new IterativeStream<T>(this, 0);
 	}
@@ -516,6 +525,7 @@ public class DataStream<T> {
 	 *
 	 * @return The iterative data stream created.
 	 */
+	@Experimental
 	public IterativeStream<T> iterate(long maxWaitTimeMillis) {
 		return new IterativeStream<T>(this, maxWaitTimeMillis);
 	}
@@ -603,6 +613,7 @@ public class DataStream<T> {
 	 * @see Tuple
 	 * @see DataStream
 	 */
+	@Experimental
 	public <R extends Tuple> SingleOutputStreamOperator<R, ?> project(int... fieldIndexes) {
 		return new StreamProjection<T>(this, fieldIndexes).projectTupleX();
 	}
@@ -714,6 +725,7 @@ public class DataStream<T> {
 	 * @param assigner The {@code WindowAssigner} that assigns elements to windows.
 	 * @return The trigger windows data stream.
 	 */
+	@Experimental
 	public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
 		return new AllWindowedStream<>(this, assigner);
 	}
@@ -732,6 +744,7 @@ public class DataStream<T> {
 	 *
 	 * @param extractor The TimestampExtractor that is called for each element of the DataStream.
 	 */
+	@Experimental
 	public SingleOutputStreamOperator<T, ?> assignTimestamps(TimestampExtractor<T> extractor) {
 		// match parallelism to input, otherwise dop=1 sources could lead to some strange
 		// behaviour: the watermark will creep along very slowly because the elements
@@ -751,6 +764,7 @@ public class DataStream<T> {
 	 *
 	 * @return The closed DataStream.
 	 */
+	@Experimental
 	public DataStreamSink<T> print() {
 		PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>();
 		return addSink(printFunction);
@@ -765,6 +779,7 @@ public class DataStream<T> {
 	 *
 	 * @return The closed DataStream.
 	 */
+	@Experimental
 	public DataStreamSink<T> printToErr() {
 		PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>(true);
 		return addSink(printFunction);
@@ -782,6 +797,7 @@ public class DataStream<T> {
 	 *
 	 * @return The closed DataStream.
 	 */
+	@Experimental
 	public DataStreamSink<T> writeAsText(String path) {
 		return write(new TextOutputFormat<T>(new Path(path)), 0L);
 	}
@@ -801,6 +817,7 @@ public class DataStream<T> {
 	 *
 	 * @return The closed DataStream.
 	 */
+	@Experimental
 	public DataStreamSink<T> writeAsText(String path, long millis) {
 		TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(path));
 		return write(tof, millis);
@@ -821,6 +838,7 @@ public class DataStream<T> {
 	 *
 	 * @return The closed DataStream.
 	 */
+	@Experimental
 	public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
 		TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(path));
 		tof.setWriteMode(writeMode);
@@ -844,6 +862,7 @@ public class DataStream<T> {
 	 *
 	 * @return The closed DataStream.
 	 */
+	@Experimental
 	public DataStreamSink<T> writeAsText(String path, WriteMode writeMode, long millis) {
 		TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(path));
 		tof.setWriteMode(writeMode);
@@ -862,6 +881,7 @@ public class DataStream<T> {
 	 *
 	 * @return the closed DataStream
 	 */
+	@Experimental
 	public DataStreamSink<T> writeAsCsv(String path) {
 		return writeAsCsv(path, null, 0L, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
 	}
@@ -881,6 +901,7 @@ public class DataStream<T> {
 	 *
 	 * @return the closed DataStream
 	 */
+	@Experimental
 	public DataStreamSink<T> writeAsCsv(String path, long millis) {
 		return writeAsCsv(path, null, millis, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
 	}
@@ -900,6 +921,7 @@ public class DataStream<T> {
 	 *
 	 * @return the closed DataStream
 	 */
+	@Experimental
 	public DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode) {
 		return writeAsCsv(path, writeMode, 0L, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
 	}
@@ -922,6 +944,7 @@ public class DataStream<T> {
 	 *
 	 * @return the closed DataStream
 	 */
+	@Experimental
 	public DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode, long millis) {
 		return writeAsCsv(path, writeMode, millis, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
 	}
@@ -949,6 +972,7 @@ public class DataStream<T> {
 	 * @return the closed DataStream
 	 */
 	@SuppressWarnings("unchecked")
+	@Experimental
 	public <X extends Tuple> DataStreamSink<T> writeAsCsv(
 			String path,
 			WriteMode writeMode,
@@ -983,6 +1007,7 @@ public class DataStream<T> {
 	 *            schema for serialization
 	 * @return the closed DataStream
 	 */
+	@Experimental
 	public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema) {
 		DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema, 0));
 		returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
@@ -996,6 +1021,7 @@ public class DataStream<T> {
 	 * @param millis the write frequency
 	 * @return The closed DataStream
 	 */
+	@Experimental
 	public DataStreamSink<T> write(OutputFormat<T> format, long millis) {
 		return addSink(new FileSinkFunctionByMillis<T>(format, millis));
 	}
@@ -1014,6 +1040,7 @@ public class DataStream<T> {
 	 *            type of the return stream
 	 * @return the data stream constructed
 	 */
+	@Experimental
 	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
 
 		// read the output type of the input Transform to coax out errors about MissingTypeInfo
@@ -1078,6 +1105,7 @@ public class DataStream<T> {
 	 *
 	 * @return The Transformation
 	 */
+	@Internal
 	public StreamTransformation<T> getTransformation() {
 		return transformation;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index c3a701d..fcfe98d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
@@ -26,6 +29,7 @@ import org.apache.flink.streaming.api.transformations.SinkTransformation;
  *
  * @param <T> The type of the elements in the Stream
  */
+@Public
 public class DataStreamSink<T> {
 
 	SinkTransformation<T> transformation;
@@ -38,6 +42,7 @@ public class DataStreamSink<T> {
 	/**
 	 * Returns the transformation that contains the actual sink operator of this sink.
 	 */
+	@Internal
 	public SinkTransformation<T> getTransformation() {
 		return transformation;
 	}
@@ -65,6 +70,7 @@ public class DataStreamSink<T> {
 	 * @param uid The unique user-specified ID of this transformation.
 	 * @return The operator with the specified ID.
 	 */
+	@Experimental
 	public DataStreamSink<T> uid(String uid) {
 		transformation.setUid(uid);
 		return this;
@@ -92,6 +98,7 @@ public class DataStreamSink<T> {
 	 *
 	 * @return The sink with chaining disabled
 	 */
+	@Experimental
 	public DataStreamSink<T> disableChaining() {
 		this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index d2e04a7..a11f65b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamSource;
@@ -27,6 +28,7 @@ import org.apache.flink.streaming.api.transformations.SourceTransformation;
  * 
  * @param <T> Type of the elements in the DataStream created from the this source.
  */
+@Public
 public class DataStreamSource<T> extends SingleOutputStreamOperator<T, DataStreamSource<T>> {
 
 	boolean isParallel;

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
index 346bef9..d03e8e0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -32,6 +34,7 @@ import java.util.Collection;
  * 
  * @param <T> Type of the elements in this Stream
  */
+@Experimental
 public class IterativeStream<T> extends SingleOutputStreamOperator<T, IterativeStream<T>> {
 
 	// We store these so that we can create a co-iteration if we need to
@@ -142,6 +145,7 @@ public class IterativeStream<T> extends SingleOutputStreamOperator<T, IterativeS
 	 * @param <F>
 	 *            Type of the feedback of the iteration
 	 */
+	@Public
 	public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> {
 
 		private CoFeedbackTransformation<F> coFeedbackTransformation;

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index cff9355..aa866eb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
@@ -59,6 +61,7 @@ import static java.util.Objects.requireNonNull;
  *     .apply(new MyJoinFunction());
  * } </pre>
  */
+@Public
 public class JoinedStreams<T1, T2> {
 
 	/** The first input stream */
@@ -89,10 +92,11 @@ public class JoinedStreams<T1, T2> {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * CoGrouped streams that have the key for one side defined.
+	 * Joined streams that have the key for one side defined.
 	 *
 	 * @param <KEY> The type of the key.
 	 */
+	@Public
 	public class Where<KEY> {
 
 		private final KeySelector<T1, KEY> keySelector1;
@@ -119,8 +123,9 @@ public class JoinedStreams<T1, T2> {
 		// --------------------------------------------------------------------
 
 		/**
-		 * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
+		 * A join operation that has {@link KeySelector KeySelectors} defined for both inputs.
 		 */
+		@Public
 		public class EqualTo {
 
 			private final KeySelector<T2, KEY> keySelector2;
@@ -130,8 +135,9 @@ public class JoinedStreams<T1, T2> {
 			}
 
 			/**
-			 * Specifies the window on which the co-group operation works.
+			 * Specifies the window on which the join operation works.
 			 */
+			@Experimental
 			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
 				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
 			}
@@ -149,6 +155,7 @@ public class JoinedStreams<T1, T2> {
 	 * @param <KEY> Type of the key. This must be the same for both inputs
 	 * @param <W> Type of {@link Window} on which the join operation works.
 	 */
+	@Public
 	public static class WithWindow<T1, T2, KEY, W extends Window> {
 		
 		private final DataStream<T1> input1;
@@ -164,6 +171,7 @@ public class JoinedStreams<T1, T2> {
 
 		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
 
+		@Experimental
 		protected WithWindow(DataStream<T1> input1,
 				DataStream<T2> input2,
 				KeySelector<T1, KEY> keySelector1,
@@ -189,6 +197,7 @@ public class JoinedStreams<T1, T2> {
 		/**
 		 * Sets the {@code Trigger} that should be used to trigger window emission.
 		 */
+		@Experimental
 		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
 			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
 					windowAssigner, newTrigger, evictor);
@@ -201,6 +210,7 @@ public class JoinedStreams<T1, T2> {
 		 * Note: When using an evictor window performance will degrade significantly, since
 		 * pre-aggregation of window results cannot be used.
 		 */
+		@Experimental
 		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
 			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
 					windowAssigner, trigger, newEvictor);

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 9b567f8..6077381 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -59,6 +62,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
  * @param <T> The type of the elements in the Keyed Stream.
  * @param <KEY> The type of the key in the Keyed Stream.
  */
+@Public
 public class KeyedStream<T, KEY> extends DataStream<T> {
 
 	/** The key selector that can get the key by which the stream if partitioned from the elements */
@@ -104,6 +108,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * Gets the key selector that can get the key by which the stream if partitioned from the elements.
 	 * @return The key selector for the key.
 	 */
+	@Internal
 	public KeySelector<T, KEY> getKeySelector() {
 		return this.keySelector;
 	}
@@ -112,6 +117,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * Gets the type of the key by which the stream is partitioned. 
 	 * @return The type of the key by which the stream is partitioned.
 	 */
+	@Internal
 	public TypeInformation<KEY> getKeyType() {
 		return keyType;
 	}
@@ -126,6 +132,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	// ------------------------------------------------------------------------
 	
 	@Override
+	@Experimental
 	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
 			TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
 
@@ -215,6 +222,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * @param assigner The {@code WindowAssigner} that assigns elements to windows.
 	 * @return The trigger windows data stream.
 	 */
+	@Experimental
 	public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
 		return new WindowedStream<>(this, assigner);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 432a7b1..c2fcaaf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -35,6 +37,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
  * @param <T> The type of the elements in this Stream
  * @param <O> Type of the operator.
  */
+@Public
 public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<T, O>> extends DataStream<T> {
 
 	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
@@ -74,6 +77,7 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
 	 * @param uid The unique user-specified ID of this transformation.
 	 * @return The operator with the specified ID.
 	 */
+	@Experimental
 	public SingleOutputStreamOperator<T, O> uid(String uid) {
 		transformation.setUid(uid);
 		return this;
@@ -115,11 +119,13 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
 	}
 
 	@SuppressWarnings("unchecked")
+	@Experimental
 	public SingleOutputStreamOperator<T, O> shuffle() {
 		return (SingleOutputStreamOperator<T, O>) super.shuffle();
 	}
 
 	@SuppressWarnings("unchecked")
+	@Experimental
 	public SingleOutputStreamOperator<T, O> forward() {
 		return (SingleOutputStreamOperator<T, O>) super.forward();
 	}
@@ -130,6 +136,7 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
 	}
 
 	@SuppressWarnings("unchecked")
+	@Experimental
 	public SingleOutputStreamOperator<T, O> global() {
 		return (SingleOutputStreamOperator<T, O>) super.global();
 	}
@@ -143,6 +150,7 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
 	 *            The selected {@link ChainingStrategy}
 	 * @return The operator with the modified chaining strategy
 	 */
+	@Experimental
 	private SingleOutputStreamOperator<T, O> setChainingStrategy(ChainingStrategy strategy) {
 		this.transformation.setChainingStrategy(strategy);
 		return this;
@@ -157,6 +165,7 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
 	 * 
 	 * @return The operator with chaining disabled
 	 */
+	@Experimental
 	public SingleOutputStreamOperator<T, O> disableChaining() {
 		return setChainingStrategy(ChainingStrategy.NEVER);
 	}
@@ -168,6 +177,7 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
 	 * 
 	 * @return The operator with chaining set.
 	 */
+	@Experimental
 	public SingleOutputStreamOperator<T, O> startNewChain() {
 		return setChainingStrategy(ChainingStrategy.HEAD);
 	}
@@ -306,6 +316,7 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
 	 * 
 	 * @return The operator as a part of a new resource group.
 	 */
+	@Experimental
 	public SingleOutputStreamOperator<T, O> startNewResourceGroup() {
 		transformation.setResourceStrategy(ResourceStrategy.NEWGROUP);
 		return this;
@@ -321,6 +332,7 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
 	 * 
 	 * @return The operator with isolated resource group.
 	 */
+	@Experimental
 	public SingleOutputStreamOperator<T, O> isolateResources() {
 		transformation.setResourceStrategy(ResourceStrategy.ISOLATE);
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
index 11ee7f2..0f0f301 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import com.google.common.collect.Lists;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.transformations.SelectTransformation;
 import org.apache.flink.streaming.api.transformations.SplitTransformation;
@@ -30,6 +31,8 @@ import org.apache.flink.streaming.api.transformations.SplitTransformation;
  *
  * @param <OUT> The type of the elements in the Stream
  */
+
+@Experimental
 public class SplitStream<OUT> extends DataStream<OUT> {
 
 	protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index d64248f..f945399 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -80,6 +82,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
  * @param <K> The type of the key by which elements are grouped.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
  */
+@Public
 public class WindowedStream<T, K, W extends Window> {
 
 	/** The keyed data stream that is windowed by this stream */
@@ -95,6 +98,7 @@ public class WindowedStream<T, K, W extends Window> {
 	private Evictor<? super T, ? super W> evictor;
 
 
+	@Experimental
 	public WindowedStream(KeyedStream<T, K> input,
 			WindowAssigner<? super T, W> windowAssigner) {
 		this.input = input;
@@ -105,6 +109,7 @@ public class WindowedStream<T, K, W extends Window> {
 	/**
 	 * Sets the {@code Trigger} that should be used to trigger window emission.
 	 */
+	@Experimental
 	public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
 		this.trigger = trigger;
 		return this;
@@ -117,6 +122,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * Note: When using an evictor window performance will degrade significantly, since
 	 * pre-aggregation of window results cannot be used.
 	 */
+	@Experimental
 	public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
 		this.evictor = evictor;
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 409304a..6c2d72c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api.environment;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
 import static java.util.Objects.requireNonNull;
@@ -25,6 +27,7 @@ import static java.util.Objects.requireNonNull;
 /**
  * Configuration that captures all checkpointing related settings.
  */
+@Public
 public class CheckpointConfig implements java.io.Serializable {
 
 	private static final long serialVersionUID = -750378776078908147L;
@@ -203,6 +206,7 @@ public class CheckpointConfig implements java.io.Serializable {
 	 * @deprecated This will be removed once iterations properly participate in checkpointing.
 	 */
 	@Deprecated
+	@Experimental
 	public boolean isForceCheckpointing() {
 		return forceCheckpointing;
 	}
@@ -215,6 +219,7 @@ public class CheckpointConfig implements java.io.Serializable {
 	 * @deprecated This will be removed once iterations properly participate in checkpointing.
 	 */
 	@Deprecated
+	@Experimental
 	public void setForceCheckpointing(boolean forceCheckpointing) {
 		this.forceCheckpointing = forceCheckpointing;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 6ad7352..f8c9c42 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.environment;
 
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory;
  * and {@link StreamExecutionEnvironment#createLocalEnvironment(int)}. The former version will pick a
  * default parallelism equal to the number of hardware contexts in the local machine.
  */
+@Public
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 9f43e62..79ddfb6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -39,6 +40,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Public
 public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index f4b3184..1e29155 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.api.environment;
 import com.esotericsoftware.kryo.Serializer;
 import com.google.common.base.Preconditions;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -93,6 +96,7 @@ import static java.util.Objects.requireNonNull;
  * @see org.apache.flink.streaming.api.environment.LocalStreamEnvironment
  * @see org.apache.flink.streaming.api.environment.RemoteStreamEnvironment
  */
+@Public
 public abstract class StreamExecutionEnvironment {
 
 	/** The default name to use for a streaming job if no other name has been specified */
@@ -220,6 +224,7 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * @return StreamExecutionEnvironment with chaining disabled.
 	 */
+	@Experimental
 	public StreamExecutionEnvironment disableOperatorChaining() {
 		this.isChainingEnabled = false;
 		return this;
@@ -230,6 +235,7 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * @return {@code true} if chaining is enabled, false otherwise.
 	 */
+	@Experimental
 	public boolean isChainingEnabled() {
 		return isChainingEnabled;
 	}
@@ -315,6 +321,7 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	@Deprecated
 	@SuppressWarnings("deprecation")
+	@Experimental
 	public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) {
 		checkpointCfg.setCheckpointingMode(mode);
 		checkpointCfg.setCheckpointInterval(interval);
@@ -339,6 +346,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @deprecated Use {@link #enableCheckpointing(long)} instead.
 	 */
 	@Deprecated
+	@Experimental
 	public StreamExecutionEnvironment enableCheckpointing() {
 		checkpointCfg.setCheckpointInterval(500);
 		return this;
@@ -360,6 +368,7 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	@Deprecated
 	@SuppressWarnings("deprecation")
+	@Experimental
 	public boolean isForceCheckpointing() {
 		return checkpointCfg.isForceCheckpointing();
 	}
@@ -397,6 +406,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 
 	 * @see #getStateBackend()
 	 */
+	@Experimental
 	public StreamExecutionEnvironment setStateBackend(AbstractStateBackend backend) {
 		this.defaultStateBackend = requireNonNull(backend);
 		return this;
@@ -408,6 +418,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 
 	 * @see #setStateBackend(AbstractStateBackend)
 	 */
+	@Experimental
 	public AbstractStateBackend getStateBackend() {
 		return defaultStateBackend;
 	}
@@ -421,6 +432,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @param numberOfExecutionRetries
 	 * 		The number of times the system will try to re-execute failed tasks.
 	 */
+	@Experimental
 	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
 		config.setNumberOfExecutionRetries(numberOfExecutionRetries);
 	}
@@ -432,6 +444,7 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * @return The number of times the system will try to re-execute failed tasks.
 	 */
+	@Experimental
 	public int getNumberOfExecutionRetries() {
 		return config.getNumberOfExecutionRetries();
 	}
@@ -443,6 +456,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @param parallelism
 	 * 		The parallelism to use as the default local parallelism.
 	 */
+	@Experimental
 	public static void setDefaultLocalParallelism(int parallelism) {
 		defaultLocalParallelism = parallelism;
 	}
@@ -548,6 +562,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 
 	 * @param characteristic The time characteristic.
 	 */
+	@Experimental
 	public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
 		this.timeCharacteristic = requireNonNull(characteristic);
 		if (characteristic == TimeCharacteristic.ProcessingTime) {
@@ -566,6 +581,7 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * @return The time characteristic.
 	 */
+	@Experimental
 	public TimeCharacteristic getStreamTimeCharacteristic() {
 		return timeCharacteristic;
 	}
@@ -997,6 +1013,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		a	negative value ensures retrying forever.
 	 * @return A data stream containing the strings received from the socket
 	 */
+	@Experimental
 	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry) {
 		return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
 				"Socket Stream");
@@ -1015,6 +1032,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		A character which splits received strings into records
 	 * @return A data stream containing the strings received from the socket
 	 */
+	@Experimental
 	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
 		return socketTextStream(hostname, port, delimiter, 0);
 	}
@@ -1031,6 +1049,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		allocated.
 	 * @return A data stream containing the strings received from the socket
 	 */
+	@Experimental
 	public DataStreamSource<String> socketTextStream(String hostname, int port) {
 		return socketTextStream(hostname, port, '\n');
 	}
@@ -1051,6 +1070,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream that represents the data created by the input format
 	 */
+	@Experimental
 	public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat) {
 		return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), "Custom File source");
 	}
@@ -1069,6 +1089,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream that represents the data created by the input format
 	 */
+	@Experimental
 	public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) {
 		return createInput(inputFormat, typeInfo, "Custom File source");
 	}
@@ -1211,6 +1232,7 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * @return The streamgraph representing the transformations
 	 */
+	@Internal
 	public StreamGraph getStreamGraph() {
 		if (transformations.size() <= 0) {
 			throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
@@ -1234,6 +1256,7 @@ public abstract class StreamExecutionEnvironment {
 	 * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
 	 * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
 	 */
+	@Internal
 	public <F> F clean(F f) {
 		if (getConfig().isClosureCleanerEnabled()) {
 			ClosureCleaner.clean(f, true);
@@ -1254,6 +1277,7 @@ public abstract class StreamExecutionEnvironment {
 	 * This is not meant to be used by users. The API methods that create operators must call
 	 * this method.
 	 */
+	@Internal
 	public void addOperator(StreamTransformation<?> transformation) {
 		Preconditions.checkNotNull(transformation, "transformation must not be null.");
 		this.transformations.add(transformation);

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
index ae11cd9..03ce2f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.co;
 
 import java.io.Serializable;
 
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.util.Collector;
 
@@ -34,6 +35,7 @@ import org.apache.flink.util.Collector;
  * @param <OUT>
  *            Output type.
  */
+@Public
 public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
 
 	void flatMap1(IN1 value, Collector<OUT> out) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
index a545282..6a3b4e0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.co;
 
 import java.io.Serializable;
 
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.Function;
 
 /**
@@ -33,6 +34,7 @@ import org.apache.flink.api.common.functions.Function;
  * @param <OUT>
  *            Output type.
  */
+@Public
 public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
 
 	OUT map1(IN1 value) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
index 6746140..0e532ea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.functions.co;
 
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.RichFunction;
 
@@ -33,6 +34,7 @@ import org.apache.flink.api.common.functions.RichFunction;
  * @param <OUT>
  *            Output type.
  */
+@Public
 public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
 		CoFlatMapFunction<IN1, IN2, OUT> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
index e561408..c0d1350 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.functions.co;
 
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.RichFunction;
 
@@ -33,6 +34,7 @@ import org.apache.flink.api.common.functions.RichFunction;
  * @param <OUT>
  *            Output type.
  */
+@Public
 public abstract class RichCoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
 		CoMapFunction<IN1, IN2, OUT> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java
index 3bbb14b..c79b0dd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/DiscardingSink.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.streaming.api.functions.sink;
 
+import org.apache.flink.annotation.Public;
+
 /**
  * A stream sink that ignores all elements.
  * 
  * @param <T> The type of elements received by the sink.
  */
+@Public
 public class DiscardingSink<T> implements SinkFunction<T> {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
index 7853758..643c06c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
@@ -17,8 +17,10 @@
 
 package org.apache.flink.streaming.api.functions.sink;
 
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 
+@Public
 public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
index 21308ed..68a630b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.functions.sink;
 
 import java.io.Serializable;
 
+import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.Function;
 
 /**
@@ -26,6 +27,7 @@ import org.apache.flink.api.common.functions.Function;
  *
  * @param <IN> Input type parameter.
  */
+@Public
 public interface SinkFunction<IN> extends Function, Serializable {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/b54499b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
index ab380d7..8a516f1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.api.functions.source;
 
 
+import org.apache.flink.annotation.Experimental;
+
 /**
  * A marker interface that must be implemented by {@link SourceFunction}s that emit elements with
  * timestamps. The {@link SourceFunction} can extract the timestamp from the data and attach it to
@@ -36,4 +38,5 @@ package org.apache.flink.streaming.api.functions.source;
  *
  * @param <T> Type of the elements emitted by this source.
  */
+@Experimental
 public interface EventTimeSourceFunction<T> extends SourceFunction<T> { }


Mime
View raw message