flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/5] flink git commit: [FLINK-3413] [streaming] Remove implicit Seq -> DataStream conversion
Date Tue, 16 Feb 2016 18:28:14 GMT
[FLINK-3413] [streaming] Remove implicit Seq -> DataStream conversion

Because the implicit conversion creates a new ExecutionEnvironment, it leads to
strange errors when used withing programs with more than one source.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cce136b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cce136b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cce136b

Branch: refs/heads/master
Commit: 8cce136bf89fa2a399db8b25a2045e57d5cc4f61
Parents: 5b1231d
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Feb 16 11:14:26 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Feb 16 17:42:18 2016 +0100

----------------------------------------------------------------------
 .../examples/scala/StreamingTableFilter.scala   | 22 +++++++++++---------
 .../flink/streaming/api/scala/package.scala     |  3 ---
 2 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8cce136b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
index 63dddc9..b91b5ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
@@ -39,27 +39,29 @@ object StreamingTableFilter {
       return
     }
 
-    val cars = genCarStream().toTable
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    
+    val cars = env.fromCollection(genCarStream()).toTable
       .filter('carId === 0)
       .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time)
       .toDataStream[CarEvent]
 
     cars.print()
 
-    StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
+    env.execute("TopSpeedWindowing")
 
   }
 
-  def genCarStream(): DataStream[CarEvent] = {
+  def genCarStream(): Stream[CarEvent] = {
 
-    def nextSpeed(carEvent : CarEvent) : CarEvent =
-    {
-      val next =
-        if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed
- 5)
-      CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis)
+    def nextSpeed(carEvent : CarEvent) : CarEvent = {
+      val next = if (Random.nextBoolean()) min(100, carEvent.speed + 5)
+                 else max(0, carEvent.speed - 5)
+      
+      CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis())
     }
-    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
-    {
+    
+    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] = {
       Thread.sleep(1000)
       speeds.append(carStream(speeds.map(nextSpeed)))
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/8cce136b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index e668064..26b0265 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -46,9 +46,6 @@ package object scala {
   implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: ConnectedJavaStreams[IN1,
IN2]):
   ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream)
 
-  implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T]
=
-    StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq)
-
   private[flink] def fieldNames2Indices(
       typeInfo: TypeInformation[_],
       fields: Array[String]): Array[Int] = {


Mime
View raw message