flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [27/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:42:05 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
deleted file mode 100644
index 177a9ee..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ /dev/null
@@ -1,573 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
-  SingleOutputStreamOperator, GroupedDataStream}
-import scala.reflect.ClassTag
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.streaming.api.invokable.StreamInvokable
-import org.apache.flink.streaming.api.invokable.operator.{ GroupedReduceInvokable, StreamReduceInvokable }
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.streaming.api.function.sink.SinkFunction
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
-import org.apache.flink.streaming.api.windowing.policy.{ EvictionPolicy, TriggerPolicy }
-import org.apache.flink.streaming.api.collector.OutputSelector
-import scala.collection.JavaConversions._
-import java.util.HashMap
-import org.apache.flink.streaming.api.function.aggregation.SumFunction
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
-import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy
-
-class DataStream[T](javaStream: JavaStream[T]) {
-
-  /**
-   * Gets the underlying java DataStream object.
-   */
-  def getJavaStream: JavaStream[T] = javaStream
-
-  /**
-   * Sets the degree of parallelism of this operation. This must be greater than 1.
-   */
-  def setParallelism(dop: Int): DataStream[T] = {
-    javaStream match {
-      case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
-      case _ =>
-        throw new UnsupportedOperationException("Operator " + javaStream.toString +  " cannot " +
-          "have " +
-          "parallelism.")
-    }
-    this
-  }
-
-  /**
-   * Returns the degree of parallelism of this operation.
-   */
-  def getParallelism: Int = javaStream match {
-    case op: SingleOutputStreamOperator[_, _] => op.getParallelism
-    case _ =>
-      throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have" +
-        " "  +
-        "parallelism.")
-  }
-  
-  def setChainingStrategy(strategy: ChainingStrategy): DataStream[T] = {
-    javaStream match {
-      case ds: SingleOutputStreamOperator[_, _] => ds.setChainingStrategy(strategy)
-      case _ =>
-        throw new UnsupportedOperationException("Only supported for operators.")
-    }
-    this
-  }
-
-  /**
-   * Creates a new DataStream by merging DataStream outputs of
-   * the same type with each other. The DataStreams merged using this operator
-   * will be transformed simultaneously.
-   *
-   */
-  def merge(dataStreams: DataStream[T]*): DataStream[T] =
-    javaStream.merge(dataStreams.map(_.getJavaStream): _*)
-
-  /**
-   * Creates a new ConnectedDataStream by connecting
-   * DataStream outputs of different type with each other. The
-   * DataStreams connected using this operators can be used with CoFunctions.
-   *
-   */
-  def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = 
-    javaStream.connect(dataStream.getJavaStream)
-
-  /**
-   * Groups the elements of a DataStream by the given key positions (for tuple/array types) to
-   * be used with grouped operators like grouped reduce or grouped aggregations
-   *
-   */
-  def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*)
-
-  /**
-   * Groups the elements of a DataStream by the given field expressions to
-   * be used with grouped operators like grouped reduce or grouped aggregations
-   *
-   */
-  def groupBy(firstField: String, otherFields: String*): DataStream[T] = 
-   javaStream.groupBy(firstField +: otherFields.toArray: _*)   
-  
-  /**
-   * Groups the elements of a DataStream by the given K key to
-   * be used with grouped operators like grouped reduce or grouped aggregations
-   *
-   */
-  def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
-
-    val keyExtractor = new KeySelector[T, K] {
-      val cleanFun = clean(fun)
-      def getKey(in: T) = cleanFun(in)
-    }
-    javaStream.groupBy(keyExtractor)
-  }
-
-  /**
-   * Sets the partitioning of the DataStream so that the output tuples
-   * are broadcasted to every parallel instance of the next component. This
-   * setting only effects the how the outputs will be distributed between the
-   * parallel instances of the next processing operator.
-   *
-   */
-  def broadcast: DataStream[T] = javaStream.broadcast()
-  
-  /**
-   * Sets the partitioning of the DataStream so that the output values all go to 
-   * the first instance of the next processing operator. Use this setting with care
-   * since it might cause a serious performance bottleneck in the application.
-   */
-  def global: DataStream[T] = javaStream.global()
-
-  /**
-   * Sets the partitioning of the DataStream so that the output tuples
-   * are shuffled to the next component. This setting only effects the how the
-   * outputs will be distributed between the parallel instances of the next
-   * processing operator.
-   *
-   */
-  def shuffle: DataStream[T] = javaStream.shuffle()
-
-  /**
-   * Sets the partitioning of the DataStream so that the output tuples
-   * are forwarded to the local subtask of the next component (whenever
-   * possible). This is the default partitioner setting. This setting only
-   * effects the how the outputs will be distributed between the parallel
-   * instances of the next processing operator.
-   *
-   */
-  def forward: DataStream[T] = javaStream.forward()
-
-  /**
-   * Sets the partitioning of the DataStream so that the output tuples
-   * are distributed evenly to the next component.This setting only effects
-   * the how the outputs will be distributed between the parallel instances of
-   * the next processing operator.
-   *
-   */
-  def distribute: DataStream[T] = javaStream.distribute()
-
-  /**
-   * Initiates an iterative part of the program that creates a loop by feeding
-   * back data streams. To create a streaming iteration the user needs to define
-   * a transformation that creates two DataStreams.The first one one is the output
-   * that will be fed back to the start of the iteration and the second is the output
-   * stream of the iterative part.
-   * <p>
-   * stepfunction: initialStream => (feedback, output)
-   * <p>
-   * A common pattern is to use output splitting to create feedback and output DataStream.
-   * Please refer to the .split(...) method of the DataStream
-   * <p>
-   * By default a DataStream with iteration will never terminate, but the user
-   * can use the maxWaitTime parameter to set a max waiting time for the iteration head.
-   * If no data received in the set time the stream terminates.
-   *
-   *
-   */
-  def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),  
-        maxWaitTimeMillis:Long = 0): DataStream[R] = {
-    val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
-
-    val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
-    iterativeStream.closeWith(feedback.getJavaStream)
-    output
-  }
-
-  /**
-   * Applies an aggregation that that gives the current maximum of the data stream at
-   * the given position.
-   *
-   */
-  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
-  
-  /**
-   * Applies an aggregation that that gives the current maximum of the data stream at
-   * the given field.
-   *
-   */
-  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
-  
-  /**
-   * Applies an aggregation that that gives the current minimum of the data stream at
-   * the given position.
-   *
-   */
-  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
-  
-  /**
-   * Applies an aggregation that that gives the current minimum of the data stream at
-   * the given field.
-   *
-   */
-  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
-
-  /**
-   * Applies an aggregation that sums the data stream at the given position.
-   *
-   */
-  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
-  
-  /**
-   * Applies an aggregation that sums the data stream at the given field.
-   *
-   */
-  def sum(field: String): DataStream[T] =  aggregate(AggregationType.SUM, field)
-
-  /**
-   * Applies an aggregation that that gives the current minimum element of the data stream by
-   * the given position. When equality, the first element is returned with the minimal value.
-   *
-   */
-  def minBy(position: Int): DataStream[T] = aggregate(AggregationType
-    .MINBY, position)
-    
-   /**
-   * Applies an aggregation that that gives the current minimum element of the data stream by
-   * the given field. When equality, the first element is returned with the minimal value.
-   *
-   */
-  def minBy(field: String): DataStream[T] = aggregate(AggregationType
-    .MINBY, field )
-
-   /**
-   * Applies an aggregation that that gives the current maximum element of the data stream by
-   * the given position. When equality, the first element is returned with the maximal value.
-   *
-   */
-  def maxBy(position: Int): DataStream[T] =
-    aggregate(AggregationType.MAXBY, position)
-    
-   /**
-   * Applies an aggregation that that gives the current maximum element of the data stream by
-   * the given field. When equality, the first element is returned with the maximal value.
-   *
-   */
-  def maxBy(field: String): DataStream[T] =
-    aggregate(AggregationType.MAXBY, field)
-    
-  private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
-    val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
-    aggregate(aggregationType, position)
-  }
-
-  private def aggregate(aggregationType: AggregationType, position: Int):
-    DataStream[T] = {
-
-    val jStream = javaStream.asInstanceOf[JavaStream[Product]]
-    val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
-
-    val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position)
-
-    val reducer = aggregationType match {
-      case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).
-        getTypeClass()))
-      case _ => new agg.ProductComparableAggregator(aggregationType, true)
-    }
-
-    val invokable = jStream match {
-      case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer,
-        groupedStream.getKeySelector())
-      case _ => new StreamReduceInvokable(reducer)
-    }
-    new DataStream[Product](jStream.transform("aggregation", jStream.getType(),
-      invokable)).asInstanceOf[DataStream[T]]
-  }
-
-  /**
-   * Creates a new DataStream containing the current number (count) of
-   * received records.
-   *
-   */
-  def count: DataStream[Long] = new DataStream[java.lang.Long](
-    javaStream.count()).asInstanceOf[DataStream[Long]]
-
-  /**
-   * Creates a new DataStream by applying the given function to every element of this DataStream.
-   */
-  def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-    val mapper = new MapFunction[T, R] {
-      val cleanFun = clean(fun)
-      def map(in: T): R = cleanFun(in)
-    }
-
-    javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))
-  }
-
-  /**
-   * Creates a new DataStream by applying the given function to every element of this DataStream.
-   */
-  def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataStream[R] = {
-    if (mapper == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-
-    javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper))
-  }
-
-  /**
-   * Creates a new DataStream by applying the given function to every element and flattening
-   * the results.
-   */
-  def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataStream[R] = {
-    if (flatMapper == null) {
-      throw new NullPointerException("FlatMap function must not be null.")
-    }
-   javaStream.transform("flatMap", implicitly[TypeInformation[R]], 
-       new FlatMapInvokable[T, R](flatMapper))
-  }
-
-  /**
-   * Creates a new DataStream by applying the given function to every element and flattening
-   * the results.
-   */
-  def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("FlatMap function must not be null.")
-    }
-    val flatMapper = new FlatMapFunction[T, R] {
-      val cleanFun = clean(fun)
-      def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) }
-    }
-    flatMap(flatMapper)
-  }
-
-  /**
-   * Creates a new DataStream by applying the given function to every element and flattening
-   * the results.
-   */
-  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("FlatMap function must not be null.")
-    }
-    val flatMapper = new FlatMapFunction[T, R] {
-      val cleanFun = clean(fun)
-      def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect }
-    }
-    flatMap(flatMapper)
-  }
-
-  /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream
-   * using an associative reduce function.
-   */
-  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
-    if (reducer == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    javaStream match {
-      case ds: GroupedDataStream[_] => javaStream.transform("reduce",
-        javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector()))
-      case _ => javaStream.transform("reduce", javaStream.getType(),
-        new StreamReduceInvokable[T](reducer))
-    }
-  }
-
-  /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream
-   * using an associative reduce function.
-   */
-  def reduce(fun: (T, T) => T): DataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    val reducer = new ReduceFunction[T] {
-      val cleanFun = clean(fun)
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
-    }
-    reduce(reducer)
-  }
-
-  /**
-   * Creates a new DataStream that contains only the elements satisfying the given filter predicate.
-   */
-  def filter(filter: FilterFunction[T]): DataStream[T] = {
-    if (filter == null) {
-      throw new NullPointerException("Filter function must not be null.")
-    }
-    javaStream.filter(filter)
-  }
-
-  /**
-   * Creates a new DataStream that contains only the elements satisfying the given filter predicate.
-   */
-  def filter(fun: T => Boolean): DataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("Filter function must not be null.")
-    }
-    val filter = new FilterFunction[T] {
-      val cleanFun = clean(fun)
-      def filter(in: T) = cleanFun(in)
-    }
-    this.filter(filter)
-  }
-
-  /**
-   * Create a WindowedDataStream that can be used to apply
-   * transformation like .reduce(...) or aggregations on
-   * preset chunks(windows) of the data stream. To define the windows one or
-   * more WindowingHelper-s such as Time, Count and
-   * Delta can be used.</br></br> When applied to a grouped data
-   * stream, the windows (evictions) and slide sizes (triggers) will be
-   * computed on a per group basis. </br></br> For more advanced control over
-   * the trigger and eviction policies please use to
-   * window(List(triggers), List(evicters))
-   */
-  def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
-    javaStream.window(windowingHelper: _*)
-
-  /**
-   * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s.
-   * Windowing can be used to apply transformation like .reduce(...) or aggregations on
-   * preset chunks(windows) of the data stream.</br></br>For most common
-   * use-cases please refer to window(WindowingHelper[_]*)
-   *
-   */
-  def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]):
-    WindowedDataStream[T] = javaStream.window(triggers, evicters)
-
-  /**
-   *
-   * Operator used for directing tuples to specific named outputs using an
-   * OutputSelector. Calling this method on an operator creates a new
-   * SplitDataStream.
-   */
-  def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream.split(selector)
-
-  /**
-   * Creates a new SplitDataStream that contains only the elements satisfying the
-   *  given output selector predicate.
-   */
-  def split(fun: T => String): SplitDataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("OutputSelector must not be null.")
-    }
-    val selector = new OutputSelector[T] {
-      val cleanFun = clean(fun)
-      def select(in: T): java.lang.Iterable[String] = {
-        List(cleanFun(in))
-      }
-    }
-    split(selector)
-  }
-
-  /**
-   * Initiates a temporal Join transformation that joins the elements of two
-   * data streams on key equality over a specified time window.
-   *
-   * This method returns a StreamJoinOperator on which the
-   * .onWindow(..) should be called to define the
-   * window, and then the .where(..) and .equalTo(..) methods can be used to defin
-   * the join keys.</p> The user can also use the apply method of the returned JoinedStream
-   * to use custom join function.
-   *
-   */
-  def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] =
-    new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
-
-  /**
-   * Initiates a temporal cross transformation that builds all pair
-   * combinations of elements of both DataStreams, i.e., it builds a Cartesian
-   * product.
-   *
-   * This method returns a StreamJoinOperator on which the
-   * .onWindow(..) should be called to define the
-   * window, and then the .where(..) and .equalTo(..) methods can be used to defin
-   * the join keys.</p> The user can also use the apply method of the returned JoinedStream
-   * to use custom join function.
-   *
-   */
-  def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] =
-    new StreamCrossOperator[T, R](javaStream, stream.getJavaStream)
-
-  /**
-   * Writes a DataStream to the standard output stream (stdout). For each
-   * element of the DataStream the result of .toString is
-   * written.
-   *
-   */
-  def print(): DataStream[T] = javaStream.print()
-
-  /**
-   * Writes a DataStream to the file specified by path in text format. The
-   * writing is performed periodically, in every millis milliseconds. For
-   * every element of the DataStream the result of .toString
-   * is written.
-   *
-   */
-  def writeAsText(path: String, millis: Long = 0): DataStream[T] =
-    javaStream.writeAsText(path, millis)
-
-  /**
-   * Writes a DataStream to the file specified by path in text format. The
-   * writing is performed periodically, in every millis milliseconds. For
-   * every element of the DataStream the result of .toString
-   * is written.
-   *
-   */
-  def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
-    javaStream.writeAsCsv(path, millis)
-
-  /**
-   * Adds the given sink to this DataStream. Only streams with sinks added
-   * will be executed once the StreamExecutionEnvironment.execute(...)
-   * method is called.
-   *
-   */
-  def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
-    javaStream.addSink(sinkFuntion)
-
-  /**
-   * Adds the given sink to this DataStream. Only streams with sinks added
-   * will be executed once the StreamExecutionEnvironment.execute(...)
-   * method is called.
-   *
-   */
-  def addSink(fun: T => Unit): DataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("Sink function must not be null.")
-    }
-    val sinkFunction = new SinkFunction[T] {
-      val cleanFun = clean(fun)
-      def invoke(in: T) = cleanFun(in)
-    }
-    this.addSink(sinkFunction)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
deleted file mode 100644
index 9e33f80..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
-
-/**
- * The SplitDataStream represents an operator that has been split using an
- * {@link OutputSelector}. Named outputs can be selected using the
- * {@link #select} function. To apply a transformation on the whole output simply call
- * the appropriate method on this stream.
- *
- * @param <OUT>
- *            The type of the output.
- */
-class SplitDataStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){
-
-  /**
-   *  Sets the output names for which the next operator will receive values.
-   */
-  def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*)
-  
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
deleted file mode 100644
index a408ec0..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import scala.reflect.ClassTag
-import org.apache.commons.lang.Validate
-import org.apache.flink.api.common.functions.CrossFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.scala.typeutils.CaseClassSerializer
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.api.function.co.CrossWindowFunction
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
-import java.util.concurrent.TimeUnit
-
-class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
-  TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
-
-  override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = {
-
-    val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
-      (l: I1, r: I2) => (l, r))
-
-    val returnType = new CaseClassTypeInfo[(I1, I2)](
-
-      classOf[(I1, I2)], Seq(input1.getType, input2.getType), Array("_1", "_2")) {
-
-      override def createSerializer: TypeSerializer[(I1, I2)] = {
-        val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-        for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer
-        }
-
-        new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
-          override def createInstance(fields: Array[AnyRef]) = {
-            (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
-          }
-        }
-      }
-    }
-
-    val javaStream = input1.connect(input2).addGeneralWindowCombine(
-      crossWindowFunction,
-      returnType, windowSize,
-      slideInterval, timeStamp1, timeStamp2)
-
-    new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream)
-  }
-}
-object StreamCrossOperator {
-
-  private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2],
-                                           javaStream: JavaStream[(I1, I2)]) extends
-    DataStream[(I1, I2)](javaStream) with TemporalWindow[CrossWindow[I1, I2]] {
-
-    /**
-     * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf
-     * call will be emitted.
-     *
-     */
-    def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
-
-      val invokable = new CoWindowInvokable[I1, I2, R](
-        clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
-        op.timeStamp2)
-
-      javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
-        invokable)
-
-      javaStream.setType(implicitly[TypeInformation[R]])
-    }
-    
-    override def every(length: Long, timeUnit: TimeUnit): CrossWindow[I1, I2] = {
-      every(timeUnit.toMillis(length))
-    }
-
-    override def every(length: Long): CrossWindow[I1, I2] = {
-      val builder = javaStream.getExecutionEnvironment().getStreamGraph()
-      val invokable = builder.getInvokable(javaStream.getId())
-      invokable.asInstanceOf[CoWindowInvokable[_,_,_]].setSlideSize(length)
-      this
-    }
-  }
-
-  private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2],
-                                                       crossFunction: (I1, I2) => R):
-  CrossWindowFunction[I1, I2, R] = {
-    Validate.notNull(crossFunction, "Join function must not be null.")
-
-    val crossFun = new CrossFunction[I1, I2, R] {
-      val cleanFun = op.input1.clean(crossFunction)
-
-      override def cross(first: I1, second: I2): R = {
-        cleanFun(first, second)
-      }
-    }
-
-    new CrossWindowFunction[I1, I2, R](crossFun)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
deleted file mode 100644
index 394673c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import scala.reflect.ClassTag
-import org.apache.commons.lang.Validate
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
-import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, SourceFunction }
-import org.apache.flink.util.Collector
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.streaming.api.function.source.FileMonitoringFunction.WatchType
-
-class StreamExecutionEnvironment(javaEnv: JavaEnv) {
-
-  /**
-   * Sets the degree of parallelism (DOP) for operations executed through this environment.
-   * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
-   * x parallel instances. This value can be overridden by specific operations using
-   * [[DataStream.setParallelism]].
-   */
-  def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
-    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
-  }
-
-  /**
-   * Returns the default degree of parallelism for this execution environment. Note that this
-   * value can be overridden by individual operations using [[DataStream.setParallelism]]
-   */
-  def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
-
-  /**
-   * Sets the maximum time frequency (milliseconds) for the flushing of the
-   * output buffers. By default the output buffers flush frequently to provide
-   * low latency and to aid smooth developer experience. Setting the parameter
-   * can result in three logical modes:
-   *
-   * <ul>
-   * <li>
-   * A positive integer triggers flushing periodically by that integer</li>
-   * <li>
-   * 0 triggers flushing after every record thus minimizing latency</li>
-   * <li>
-   * -1 triggers flushing only when the output buffer is full thus maximizing
-   * throughput</li>
-   * </ul>
-   *
-   */
-  def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
-    javaEnv.setBufferTimeout(timeoutMillis)
-    this
-  }
-
-  /**
-   * Gets the default buffer timeout set for this environment
-   */
-  def getBufferTimout: Long = javaEnv.getBufferTimeout()
-
-  /**
-   * Creates a DataStream that represents the Strings produced by reading the
-   * given file line wise. The file will be read with the system's default
-   * character set.
-   *
-   */
-  def readTextFile(filePath: String): DataStream[String] =
-    javaEnv.readTextFile(filePath)
-
-  /**
-   * Creates a DataStream that contains the contents of file created while
-   * system watches the given path. The file will be read with the system's
-   * default character set. The user can check the monitoring interval in milliseconds,
-   * and the way file modifications are handled. By default it checks for only new files
-   * every 100 milliseconds.
-   *
-   */
-  def readFileStream(StreamPath: String, intervalMillis: Long = 100, watchType: WatchType = 
-    WatchType.ONLY_NEW_FILES): DataStream[String] =
-    javaEnv.readFileStream(StreamPath, intervalMillis, watchType)
-
-  /**
-   * Creates a new DataStream that contains the strings received infinitely
-   * from socket. Received strings are decoded by the system's default
-   * character set.
-   *
-   */
-  def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] =
-    javaEnv.socketTextStream(hostname, port, delimiter)
-
-  /**
-   * Creates a new DataStream that contains the strings received infinitely
-   * from socket. Received strings are decoded by the system's default
-   * character set, uses '\n' as delimiter.
-   *
-   */
-  def socketTextStream(hostname: String, port: Int): DataStream[String] =
-    javaEnv.socketTextStream(hostname, port)
-
-  /**
-   * Creates a new DataStream that contains a sequence of numbers.
-   *
-   */
-  def generateSequence(from: Long, to: Long): DataStream[Long] = {
-    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
-      asInstanceOf[DataStream[Long]]
-  }
-
-  /**
-   * Creates a DataStream that contains the given elements. The elements must all be of the
-   * same type and must be serializable.
-   *
-   * * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a degree of parallelism of one.
-   */
-  def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
-    val typeInfo = implicitly[TypeInformation[T]]
-    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
-  }
-
-  /**
-   * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable
-   * because the framework may move the elements into the cluster if needed.
-   *
-   * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a degree of parallelism of one.
-   */
-  def fromCollection[T: ClassTag: TypeInformation](
-    data: Seq[T]): DataStream[T] = {
-    Validate.notNull(data, "Data must not be null.")
-    val typeInfo = implicitly[TypeInformation[T]]
-
-    val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
-        .asJavaCollection(data))
-        
-    javaEnv.addSource(sourceFunction, typeInfo)
-  }
-
-  /**
-   * Create a DataStream using a user defined source function for arbitrary
-   * source functionality. By default sources have a parallelism of 1. 
-   * To enable parallel execution, the user defined source should implement 
-   * ParallelSourceFunction or extend RichParallelSourceFunction. 
-   * In these cases the resulting source will have the parallelism of the environment. 
-   * To change this afterwards call DataStreamSource.setParallelism(int)
-   *
-   */
-  def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
-    Validate.notNull(function, "Function must not be null.")
-    val cleanFun = StreamExecutionEnvironment.clean(function)
-    val typeInfo = implicitly[TypeInformation[T]]
-    javaEnv.addSource(cleanFun, typeInfo)
-  }
-  
-   /**
-   * Create a DataStream using a user defined source function for arbitrary
-   * source functionality.
-   *
-   */
-  def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = {
-    Validate.notNull(function, "Function must not be null.")
-    val sourceFunction = new SourceFunction[T] {
-      val cleanFun = StreamExecutionEnvironment.clean(function)
-      override def invoke(out: Collector[T]) {
-        cleanFun(out)
-      }
-    }
-    addSource(sourceFunction)
-  }
-
-  /**
-   * Triggers the program execution. The environment will execute all parts of
-   * the program that have resulted in a "sink" operation. Sink operations are
-   * for example printing results or forwarding them to a message queue.
-   * <p>
-   * The program execution will be logged and displayed with a generated
-   * default name.
-   *
-   */
-  def execute() = javaEnv.execute()
-
-  /**
-   * Triggers the program execution. The environment will execute all parts of
-   * the program that have resulted in a "sink" operation. Sink operations are
-   * for example printing results or forwarding them to a message queue.
-   * <p>
-   * The program execution will be logged and displayed with the provided name
-   *
-   */
-  def execute(jobName: String) = javaEnv.execute(jobName)
-
-  /**
-   * Creates the plan with which the system will execute the program, and
-   * returns it as a String using a JSON representation of the execution data
-   * flow graph. Note that this needs to be called, before the plan is
-   * executed.
-   *
-   */
-  def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON
-
-}
-
-object StreamExecutionEnvironment {
-  
-  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
-    ClosureCleaner.clean(f, checkSerializable)
-    f
-  }
-
-  /**
-   * Creates an execution environment that represents the context in which the program is
-   * currently executed. If the program is invoked standalone, this method returns a local
-   * execution environment. If the program is invoked from within the command line client
-   * to be submitted to a cluster, this method returns the execution environment of this cluster.
-   */
-  def getExecutionEnvironment: StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
-  }
-
-  /**
-   * Creates a local execution environment. The local execution environment will run the program in
-   * a multi-threaded fashion in the same JVM as the environment was created in. The default degree
-   * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
-   */
-  def createLocalEnvironment(
-    degreeOfParallelism: Int =  Runtime.getRuntime.availableProcessors()):
-  StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism))
-  }
-
-  /**
-   * Creates a remote execution environment. The remote environment sends (parts of) the program to
-   * a cluster for execution. Note that all file paths used in the program must be accessible from
-   * the cluster. The execution will use the cluster's default degree of parallelism, unless the
-   * parallelism is set explicitly via [[StreamExecutionEnvironment.setDegreeOfParallelism()]].
-   *
-   * @param host The host name or address of the master (JobManager),
-   *             where the program should be executed.
-   * @param port The port of the master (JobManager), where the program should be executed.
-   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
-   *                 program uses
-   *                 user-defined functions, user-defined input formats, or any libraries,
-   *                 those must be
-   *                 provided in the JAR files.
-   */
-  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*):
-  StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*))
-  }
-
-  /**
-   * Creates a remote execution environment. The remote environment sends (parts of) the program
-   * to a cluster for execution. Note that all file paths used in the program must be accessible
-   * from the cluster. The execution will use the specified degree of parallelism.
-   *
-   * @param host The host name or address of the master (JobManager),
-   *             where the program should be executed.
-   * @param port The port of the master (JobManager), where the program should be executed.
-   * @param degreeOfParallelism The degree of parallelism to use during the execution.
-   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
-   *                 program uses
-   *                 user-defined functions, user-defined input formats, or any libraries,
-   *                 those must be
-   *                 provided in the JAR files.
-   */
-  def createRemoteEnvironment(
-    host: String,
-    port: Int,
-    degreeOfParallelism: Int,
-    jarFiles: String*): StreamExecutionEnvironment = {
-    val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
-    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
-    new StreamExecutionEnvironment(javaEnv)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
deleted file mode 100644
index 1bd1bfb..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import scala.Array.canBuildFrom
-import scala.reflect.ClassTag
-import org.apache.commons.lang.Validate
-import org.apache.flink.api.common.functions.JoinFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.operators.Keys
-import org.apache.flink.api.scala.typeutils.CaseClassSerializer
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.function.co.JoinWindowFunction
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.util.keys.KeySelectorUtil
-import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
-import java.util.concurrent.TimeUnit
-
-class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends 
-TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
-
-  override def createNextWindowOperator() = {
-    new StreamJoinOperator.JoinWindow[I1, I2](this)
-  }
-}
-
-object StreamJoinOperator {
-
-  class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) extends 
-  TemporalWindow[JoinWindow[I1, I2]] {
-
-    private[flink] val type1 = op.input1.getType()
-
-    /**
-     * Continues a temporal Join transformation by defining
-     * the fields in the first stream to be used as keys for the join.
-     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
-     * to define the second key.
-     */
-    def where(fields: Int*) = {
-      new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
-        new Keys.ExpressionKeys(fields.toArray, type1), type1))
-    }
-
-    /**
-     * Continues a temporal Join transformation by defining
-     * the fields in the first stream to be used as keys for the join.
-     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
-     * to define the second key.
-     */
-    def where(firstField: String, otherFields: String*) =
-      new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
-        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1), type1))
-
-    /**
-     * Continues a temporal Join transformation by defining
-     * the keyselector function that will be used to extract keys from the first stream
-     * for the join.
-     * The resulting incomplete join can be completed by JoinPredicate.equalTo()
-     * to define the second key.
-     */
-    def where[K: TypeInformation](fun: (I1) => K) = {
-      val keyType = implicitly[TypeInformation[K]]
-      val keyExtractor = new KeySelector[I1, K] {
-        val cleanFun = op.input1.clean(fun)
-        def getKey(in: I1) = cleanFun(in)
-      }
-      new JoinPredicate[I1, I2](op, keyExtractor)
-    }
-
-    override def every(length: Long, timeUnit: TimeUnit): JoinWindow[I1, I2] = {
-      every(timeUnit.toMillis(length))
-    }
-
-    override def every(length: Long): JoinWindow[I1, I2] = {
-      op.slideInterval = length
-      this
-    }
-
-  }
-
-  class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2],
-    private[flink] val keys1: KeySelector[I1, _]) {
-    private[flink] var keys2: KeySelector[I2, _] = null
-    private[flink] val type2 = op.input2.getType()
-
-    /**
-     * Creates a temporal join transformation by defining the second join key.
-     * The returned transformation wrapes each joined element pair in a tuple2:
-     * (first, second)
-     * To define a custom wrapping, use JoinedStream.apply(...)
-     */
-    def equalTo(fields: Int*): JoinedStream[I1, I2] = {
-      finish(KeySelectorUtil.getSelectorForKeys(
-        new Keys.ExpressionKeys(fields.toArray, type2), type2))
-    }
-
-    /**
-     * Creates a temporal join transformation by defining the second join key.
-     * The returned transformation wrapes each joined element pair in a tuple2:
-     * (first, second)
-     * To define a custom wrapping, use JoinedStream.apply(...)
-     */
-    def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] =
-      finish(KeySelectorUtil.getSelectorForKeys(
-        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2), type2))
-
-    /**
-     * Creates a temporal join transformation by defining the second join key.
-     * The returned transformation wrapes each joined element pair in a tuple2:
-     * (first, second)
-     * To define a custom wrapping, use JoinedStream.apply(...)
-     */
-    def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = {
-      val keyType = implicitly[TypeInformation[K]]
-      val keyExtractor = new KeySelector[I2, K] {
-        val cleanFun = op.input1.clean(fun)
-        def getKey(in: I2) = cleanFun(in)
-      }
-      finish(keyExtractor)
-    }
-
-    private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = {
-      this.keys2 = keys2
-      new JoinedStream[I1, I2](this, createJoinOperator())
-    }
-
-    private def createJoinOperator(): JavaStream[(I1, I2)] = {
-
-      val returnType = new CaseClassTypeInfo[(I1, I2)](
-
-        classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2")) {
-
-        override def createSerializer: TypeSerializer[(I1, I2)] = {
-          val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
-          for (i <- 0 until getArity) {
-            fieldSerializers(i) = types(i).createSerializer
-          }
-
-          new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
-            override def createInstance(fields: Array[AnyRef]) = {
-              (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
-            }
-          }
-        }
-      }
-
-      return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
-        .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
-          returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
-    }
-  }
-
-  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends 
-  DataStream[(I1, I2)](javaStream) {
-
-    private val op = jp.op
-
-    /**
-     * Sets a wrapper for the joined elements. For each joined pair, the result of the
-     * udf call will be emitted.
-     */
-    def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
-
-      val invokable = new CoWindowInvokable[I1, I2, R](
-        clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
-        op.timeStamp2)
-
-      javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
-        invokable)
-
-      javaStream.setType(implicitly[TypeInformation[R]])
-    }
-  }
-
-  private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2],
-    joinFunction: (I1, I2) => R) = {
-    Validate.notNull(joinFunction, "Join function must not be null.")
-
-    val joinFun = new JoinFunction[I1, I2, R] {
-
-      val cleanFun = jp.op.input1.clean(joinFunction)
-
-      override def join(first: I1, second: I2): R = {
-        cleanFun(first, second)
-      }
-    }
-
-    new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
deleted file mode 100644
index fd3a4a9..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.streaming.api.datastream.temporaloperator.{ TemporalOperator => JTempOp }
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
-import org.apache.flink.streaming.api.windowing.helper.Timestamp
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment._
-
-abstract class TemporalOperator[I1, I2, OP <: TemporalWindow[OP]](
-  i1: JavaStream[I1], i2: JavaStream[I2]) extends JTempOp[I1, I2, OP](i1, i2) {
-
-  def onWindow(length: Long, ts1: I1 => Long, ts2: I2 => Long, startTime: Long = 0): OP = {
-    val timeStamp1 = getTS(ts1)
-    val timeStamp2 = getTS(ts2)
-    onWindow(length, timeStamp1, timeStamp2, startTime)
-  }
-
-  def getTS[R](ts: R => Long): Timestamp[R] = {
-    new Timestamp[R] {
-      val cleanFun = clean(ts, true)
-      def getTimestamp(in: R) = cleanFun(in)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
deleted file mode 100644
index 5c734bf..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import scala.Array.canBuildFrom
-import scala.collection.JavaConversions.iterableAsScalaIterable
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
-import org.apache.flink.streaming.api.datastream.{WindowedDataStream => JavaWStream}
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.function.aggregation.SumFunction
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
-import org.apache.flink.streaming.api.windowing.helper._
-import org.apache.flink.util.Collector
-
-class WindowedDataStream[T](javaStream: JavaWStream[T]) {
-
-  /**
-   * Defines the slide size (trigger frequency) for the windowed data stream.
-   * This controls how often the user defined function will be triggered on
-   * the window.
-   */
-  def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
-    javaStream.every(windowingHelper: _*)
-
-  /**
-   * Groups the elements of the WindowedDataStream using the given
-   * field positions. The window sizes (evictions) and slide sizes
-   * (triggers) will be calculated on the whole stream (in a central fashion),
-   * but the user defined functions will be applied on a per group basis.
-   * </br></br> To get windows and triggers on a per group basis apply the
-   * DataStream.window(...) operator on an already grouped data stream.
-   *
-   */
-  def groupBy(fields: Int*): WindowedDataStream[T] = javaStream.groupBy(fields: _*)
-
-  /**
-   * Groups the elements of the WindowedDataStream using the given
-   * field expressions. The window sizes (evictions) and slide sizes
-   * (triggers) will be calculated on the whole stream (in a central fashion),
-   * but the user defined functions will be applied on a per group basis.
-   * </br></br> To get windows and triggers on a per group basis apply the
-   * DataStream.window(...) operator on an already grouped data stream.
-   *
-   */
-  def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] =
-   javaStream.groupBy(firstField +: otherFields.toArray: _*)   
-    
-  /**
-   * Groups the elements of the WindowedDataStream using the given
-   * KeySelector function. The window sizes (evictions) and slide sizes
-   * (triggers) will be calculated on the whole stream (in a central fashion),
-   * but the user defined functions will be applied on a per group basis.
-   * </br></br> To get windows and triggers on a per group basis apply the
-   * DataStream.window(...) operator on an already grouped data stream.
-   *
-   */
-  def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = {
-
-    val keyExtractor = new KeySelector[T, K] {
-      val cleanFun = clean(fun)
-      def getKey(in: T) = cleanFun(in)
-    }
-    javaStream.groupBy(keyExtractor)
-  }
-
-  /**
-   * Applies a reduce transformation on the windowed data stream by reducing
-   * the current window at every trigger.
-   *
-   */
-  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
-    if (reducer == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    javaStream.reduce(reducer)
-  }
-
-  /**
-   * Applies a reduce transformation on the windowed data stream by reducing
-   * the current window at every trigger.
-   *
-   */
-  def reduce(fun: (T, T) => T): DataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    val reducer = new ReduceFunction[T] {
-      val cleanFun = clean(fun)
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
-    }
-    reduce(reducer)
-  }
-
-  /**
-   * Applies a reduceGroup transformation on the windowed data stream by reducing
-   * the current window at every trigger. In contrast with the simple binary reduce operator,
-   * groupReduce exposes the whole window through the Iterable interface.
-   * </br>
-   * </br>
-   * Whenever possible try to use reduce instead of groupReduce for increased efficiency
-   */
-  def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]):
-  DataStream[R] = {
-    if (reducer == null) {
-      throw new NullPointerException("GroupReduce function must not be null.")
-    }
-    javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]])
-  }
-
-  /**
-   * Applies a reduceGroup transformation on the windowed data stream by reducing
-   * the current window at every trigger. In contrast with the simple binary reduce operator,
-   * groupReduce exposes the whole window through the Iterable interface.
-   * </br>
-   * </br>
-   * Whenever possible try to use reduce instead of groupReduce for increased efficiency
-   */
-  def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit):
-  DataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("GroupReduce function must not be null.")
-    }
-    val reducer = new GroupReduceFunction[T, R] {
-      val cleanFun = clean(fun)
-      def reduce(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in, out) }
-    }
-    reduceGroup(reducer)
-  }
-
-  /**
-   * Applies an aggregation that that gives the maximum of the elements in the window at
-   * the given position.
-   *
-   */
-  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
-  
-  /**
-   * Applies an aggregation that that gives the maximum of the elements in the window at
-   * the given field.
-   *
-   */
-  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
-
-  /**
-   * Applies an aggregation that that gives the minimum of the elements in the window at
-   * the given position.
-   *
-   */
-  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
-  
-  /**
-   * Applies an aggregation that that gives the minimum of the elements in the window at
-   * the given field.
-   *
-   */
-  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
-
-  /**
-   * Applies an aggregation that sums the elements in the window at the given position.
-   *
-   */
-  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
-  
-  /**
-   * Applies an aggregation that sums the elements in the window at the given field.
-   *
-   */
-  def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
-
-  /**
-   * Applies an aggregation that that gives the maximum element of the window by
-   * the given position. When equality, returns the first.
-   *
-   */
-  def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
-    position)
-    
-  /**
-   * Applies an aggregation that that gives the maximum element of the window by
-   * the given field. When equality, returns the first.
-   *
-   */
-  def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
-    field)
-
-  /**
-   * Applies an aggregation that that gives the minimum element of the window by
-   * the given position. When equality, returns the first.
-   *
-   */
-  def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
-    position)
-    
-   /**
-   * Applies an aggregation that that gives the minimum element of the window by
-   * the given field. When equality, returns the first.
-   *
-   */
-  def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
-    field)
-    
-  private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
-    val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
-    aggregate(aggregationType, position)
-  }  
-
-  def aggregate(aggregationType: AggregationType, position: Int):
-  DataStream[T] = {
-
-    val jStream = javaStream.asInstanceOf[JavaWStream[Product]]
-    val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
-
-    val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position)
-
-    val reducer = aggregationType match {
-      case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(
-        outType.getTypeAt(position).getTypeClass()))
-      case _ => new agg.ProductComparableAggregator(aggregationType, true)
-    }
-
-    new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
deleted file mode 100644
index 222eb6d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api
-
-import _root_.scala.reflect.ClassTag
-import language.experimental.macros
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
-import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
-import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream }
-
-package object scala {
-  // We have this here so that we always have generated TypeInformationS when
-  // using the Scala API
-  implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
-
-  implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
-    new DataStream[R](javaStream)
-
-  implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] =
-    new WindowedDataStream[R](javaWStream)
-
-  implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] =
-    new SplitDataStream[R](javaStream)
-
-  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]):
-  ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
-
-  implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T] =
-    StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq)
-
-
-  private[flink] def fieldNames2Indices(
-      typeInfo: TypeInformation[_],
-      fields: Array[String]): Array[Int] = {
-    typeInfo match {
-      case ti: CaseClassTypeInfo[_] =>
-        val result = ti.getFieldIndices(fields)
-
-        if (result.contains(-1)) {
-          throw new IllegalArgumentException("Fields '" + fields.mkString(", ") +
-            "' are not valid for '" + ti.toString + "'.")
-        }
-
-        result
-
-      case _ =>
-        throw new UnsupportedOperationException("Specifying fields by name is only" +
-          "supported on Case Classes (for now).")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
deleted file mode 100644
index eedee0e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala.windowing
-
-import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
-import org.apache.commons.lang.Validate
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction
-
-object Delta {
-
-  /**
-   * Creates a delta helper representing a delta trigger or eviction policy.
-   * </br></br> This policy calculates a delta between the data point which
-   * triggered last and the currently arrived data point. It triggers if the
-   * delta is higher than a specified threshold. </br></br> In case it gets
-   * used for eviction, this policy starts from the first element of the
-   * buffer and removes all elements from the buffer which have a higher delta
-   * then the threshold. As soon as there is an element with a lower delta,
-   * the eviction stops.
-   */
-  def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T] = {
-    Validate.notNull(deltaFunction, "Delta function must not be null")
-    val df = new DeltaFunction[T] {
-      val cleanFun = clean(deltaFunction)
-      override def getDelta(first: T, second: T) = cleanFun(first, second)
-    }
-    JavaDelta.of(threshold, df, initVal)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
deleted file mode 100644
index 9a69369..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala.windowing
-
-import java.util.concurrent.TimeUnit
-import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime }
-
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.windowing.helper.Timestamp
-import org.apache.commons.lang.Validate
-
-object Time {
-
-  /**
-   * Creates a helper representing a time trigger which triggers every given
-   * length (slide size) or a time eviction which evicts all elements older
-   * than length (window size) using System time.
-   *
-   */
-  def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] =
-    JavaTime.of(windowSize, timeUnit)
-
-  /**
-   * Creates a helper representing a time trigger which triggers every given
-   * length (slide size) or a time eviction which evicts all elements older
-   * than length (window size) using a user defined timestamp extractor.
-   *
-   */
-  def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R] = {
-    Validate.notNull(timestamp, "Timestamp must not be null.")
-    val ts = new Timestamp[R] {
-      val fun = clean(timestamp, true)
-      override def getTimestamp(in: R) = fun(in)
-    }
-    JavaTime.of(windowSize, ts, startTime)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml
deleted file mode 100644
index 386ef73..0000000
--- a/flink-addons/flink-streaming/pom.xml
+++ /dev/null
@@ -1,75 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-addons</artifactId>
-		<version>0.9-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-streaming-parent</artifactId>
-	<name>flink-streaming</name>
-	<packaging>pom</packaging>
-
-	<modules>
-		<module>flink-streaming-core</module>
-		<module>flink-streaming-scala</module>
-		<module>flink-streaming-examples</module>
-		<module>flink-streaming-connectors</module>
-	</modules>
-	
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-compiler</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-	</dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-tachyon/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-tachyon/pom.xml b/flink-addons/flink-tachyon/pom.xml
deleted file mode 100644
index de36546..0000000
--- a/flink-addons/flink-tachyon/pom.xml
+++ /dev/null
@@ -1,117 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-addons</artifactId>
-		<version>0.9-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-tachyon</artifactId>
-	<name>flink-tachyon</name>
-
-	<packaging>jar</packaging>
-
-	<!--
-		This is a Hadoop2 only flink module.
-	-->
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java-examples</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.tachyonproject</groupId>
-			<artifactId>tachyon</artifactId>
-			<version>0.5.0</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.tachyonproject</groupId>
-			<artifactId>tachyon</artifactId>
-			<version>0.5.0</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.eclipse.jetty</groupId>
-			<artifactId>jetty-util</artifactId>
-			<version>7.6.8.v20121106</version><!--$NO-MVN-MAN-VER$-->
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-common</artifactId>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-hdfs</artifactId>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-hdfs</artifactId>
-			<scope>test</scope>
-			<type>test-jar</type>
-			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-common</artifactId>
-			<scope>test</scope>
-			<type>test-jar</type>
-			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
-		</dependency>
-	</dependencies>
-	<dependencyManagement>
-		<dependencies>
-			<dependency>
-				<groupId>org.eclipse.jetty</groupId>
-				<artifactId>jetty-server</artifactId>
-				<version>7.6.8.v20121106</version>
-				<scope>test</scope>
-			</dependency>
-			<dependency>
-				<groupId>org.eclipse.jetty</groupId>
-				<artifactId>jetty-servlet</artifactId>
-				<version>7.6.8.v20121106</version>
-				<scope>test</scope>
-			</dependency>
-		</dependencies>
-	</dependencyManagement>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
deleted file mode 100644
index 7318894..0000000
--- a/flink-addons/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tachyon;
-
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.examples.java.wordcount.WordCount;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.StringWriter;
-
-/**
- * This test should logically be located in the 'flink-runtime' tests. However, this project
- * has already all dependencies required (flink-java-examples). Also, the DOPOneExecEnv is here.
- */
-public class HDFSTest {
-
-	private String hdfsURI;
-	private MiniDFSCluster hdfsCluster;
-	private org.apache.hadoop.fs.Path hdPath;
-	private org.apache.hadoop.fs.FileSystem hdfs;
-
-	@Before
-	public void createHDFS() {
-		try {
-			Configuration hdConf = new Configuration();
-
-			File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();
-			FileUtil.fullyDelete(baseDir);
-			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
-			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
-			hdfsCluster = builder.build();
-
-			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
-
-			hdPath = new org.apache.hadoop.fs.Path("/test");
-			hdfs = hdPath.getFileSystem(hdConf);
-			FSDataOutputStream stream = hdfs.create(hdPath);
-			for(int i = 0; i < 10; i++) {
-				stream.write("Hello HDFS\n".getBytes());
-			}
-			stream.close();
-
-		} catch(Throwable e) {
-			e.printStackTrace();
-			Assert.fail("Test failed " + e.getMessage());
-		}
-	}
-
-	@After
-	public void destroyHDFS() {
-		try {
-			hdfs.delete(hdPath, false);
-			hdfsCluster.shutdown();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-
-	}
-
-	@Test
-	public void testHDFS() {
-
-		Path file = new Path(hdfsURI + hdPath);
-		org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result");
-		try {
-			FileSystem fs = file.getFileSystem();
-			Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
-			new TachyonFileSystemWrapperTest.DopOneTestEnvironment();
-			try {
-				WordCount.main(new String[]{file.toString(), result.toString()});
-			} catch(Throwable t) {
-				t.printStackTrace();
-				Assert.fail("Test failed with " + t.getMessage());
-			}
-			Assert.assertTrue("No result file present", hdfs.exists(result));
-			// validate output:
-			org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
-			StringWriter writer = new StringWriter();
-			IOUtils.copy(inStream, writer);
-			String resultString = writer.toString();
-
-			Assert.assertEquals("hdfs 10\n" +
-					"hello 10\n", resultString);
-			inStream.close();
-
-		} catch (IOException e) {
-			e.printStackTrace();
-			Assert.fail("Error in test: " + e.getMessage() );
-		}
-	}
-
-	@Test
-	public void testAvroOut() {
-		String type = "one";
-		AvroOutputFormat<String> avroOut =
-				new AvroOutputFormat<String>( String.class );
-
-		org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
-
-		avroOut.setOutputFilePath(new Path(result.toString()));
-		avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
-		avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
-
-		try {
-			avroOut.open(0, 2);
-			avroOut.writeRecord(type);
-			avroOut.close();
-
-			avroOut.open(1, 2);
-			avroOut.writeRecord(type);
-			avroOut.close();
-
-
-			Assert.assertTrue("No result file present", hdfs.exists(result));
-			FileStatus[] files = hdfs.listStatus(result);
-			Assert.assertEquals(2, files.length);
-			for(FileStatus file : files) {
-				Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName()));
-			}
-
-		} catch (IOException e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-}


Mime
View raw message