flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [4/5] incubator-flink git commit: [scala] [streaming] Added package file for streaming scala api typeinfo implicits and conversions
Date Tue, 06 Jan 2015 14:43:23 GMT
[scala] [streaming] Added package file for streaming scala api typeinfo implicits and conversions


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5daa45c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5daa45c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5daa45c2

Branch: refs/heads/master
Commit: 5daa45c2c2f296e5018a2fb2b1989fa3d32f59fb
Parents: 8500ad0
Author: Gyula Fora <gyfora@apache.org>
Authored: Tue Jan 6 12:05:22 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Tue Jan 6 15:09:04 2015 +0100

----------------------------------------------------------------------
 .../examples/windowing/TopSpeedWindowing.scala  |  3 +-
 .../scala/examples/windowing/WindowJoin.scala   |  3 +-
 .../api/scala/ConnectedDataStream.scala         |  3 +-
 .../flink/streaming/api/scala/DataStream.scala  |  3 --
 .../streaming/api/scala/SplitDataStream.scala   |  2 -
 .../api/scala/StreamCrossOperator.scala         |  2 -
 .../api/scala/StreamExecutionEnvironment.scala  |  4 +-
 .../api/scala/StreamJoinOperator.scala          |  1 -
 .../api/scala/StreamingConversions.scala        | 40 -----------------
 .../api/scala/WindowedDataStream.scala          |  2 -
 .../flink/streaming/api/scala/package.scala     | 46 ++++++++++++++++++++
 .../streaming/api/scala/windowing/Time.scala    |  1 -
 12 files changed, 50 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index dc01f02..a18eb37 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -21,8 +21,7 @@ package org.apache.flink.streaming.scala.examples.windowing
 
 import java.util.concurrent.TimeUnit._
 
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.scala._
 import org.apache.flink.util.Collector
 import scala.math.{max, min}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
index e87d4a1..08c7d65 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
@@ -18,8 +18,7 @@
 
 package org.apache.flink.streaming.scala.examples.windowing
 
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala._
 import org.apache.flink.util.Collector
 import scala.util.Random
 import java.util.concurrent.TimeUnit

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
index 320bfa0..d60e796 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
@@ -25,12 +25,11 @@ import scala.reflect.ClassTag
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream}
 import org.apache.flink.streaming.api.function.co.{ CoFlatMapFunction, CoMapFunction, CoReduceFunction,
CoWindowFunction }
 import org.apache.flink.streaming.api.invokable.operator.co.{ CoFlatMapInvokable, CoMapInvokable,
CoReduceInvokable }
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.scala.StreamingConversions._
 import org.apache.flink.util.Collector
 
 class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 270b80c..6d94de7 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,10 +18,8 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
   SingleOutputStreamOperator, GroupedDataStream}
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.functions.MapFunction
@@ -47,7 +45,6 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.streaming.api.scala.StreamingConversions._
 import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
 
 class DataStream[T](javaStream: JavaStream[T]) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
index 7349db6..a4156a1 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
@@ -18,9 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream
}
-import org.apache.flink.streaming.api.scala.StreamingConversions._
 
 /**
  * The SplitDataStream represents an operator that has been split using an

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index d620d5e..e300610 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -23,14 +23,12 @@ import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.functions.CrossFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
 import org.apache.flink.streaming.api.function.co.CrossWindowFunction
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.scala.StreamingConversions._
 import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
 import java.util.concurrent.TimeUnit
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 61a6109..b4565c7 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -19,14 +19,12 @@
 package org.apache.flink.streaming.api.scala
 
 import scala.reflect.ClassTag
-
 import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, SourceFunction
}
-import org.apache.flink.streaming.api.scala.StreamingConversions.javaToScalaStream
 import org.apache.flink.util.Collector
+import org.apache.flink.api.scala.ClosureCleaner
 
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index cb79e2a..32765da 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream
}
 import org.apache.flink.streaming.api.function.co.JoinWindowFunction
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.scala.StreamingConversions.javaToScalaStream
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
 import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
 import java.util.concurrent.TimeUnit

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala
deleted file mode 100644
index fb3745f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamingConversions.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
-import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
-import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream
}
-import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream
}
-
-object StreamingConversions {
-
-  implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
-    new DataStream[R](javaStream)
-
-  implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R]
=
-    new WindowedDataStream[R](javaWStream)
-
-  implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R]
=
-    new SplitDataStream[R](javaStream)
-
-  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]):

-  ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index deda3d9..0b88137 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -27,13 +27,11 @@ import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.scala._
 import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
 import org.apache.flink.streaming.api.datastream.{WindowedDataStream => JavaWStream}
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.function.aggregation.SumFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
-import org.apache.flink.streaming.api.scala.StreamingConversions._
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.helper._
 import org.apache.flink.util.Collector

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
new file mode 100644
index 0000000..3604b55
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api
+
+import _root_.scala.reflect.ClassTag
+import language.experimental.macros
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
+import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream
}
+import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream
}
+
+package object scala {
+  // We have this here so that we always have generated TypeInformationS when
+  // using the Scala API
+  implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
+  
+  implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
+    new DataStream[R](javaStream)
+
+  implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R]
=
+    new WindowedDataStream[R](javaWStream)
+
+  implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R]
=
+    new SplitDataStream[R](javaStream)
+
+  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]):

+  ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5daa45c2/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
index e1b9768..3581730 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.scala.windowing
 import java.util.concurrent.TimeUnit
 import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime }
 
-import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.commons.net.ntp.TimeStamp
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp


Mime
View raw message