flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-1711] Converted all usages of commons validate to guava checks(for Java classes), scala predef require(for scala classes)
Date Fri, 15 May 2015 13:28:45 GMT
Repository: flink
Updated Branches:
  refs/heads/master b335f5879 -> f950def56


http://git-wip-us.apache.org/repos/asf/flink/blob/f950def5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index e93ec72..7f10941 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.datastream;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -87,6 +86,8 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A DataStream represents a stream of elements of the same type. A DataStream
  * can be transformed into another DataStream by applying a transformation as
@@ -1081,7 +1082,7 @@ public class DataStream<OUT> {
 	 */
 	@SuppressWarnings("unchecked")
 	public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path) {
-		Validate.isTrue(getType().isTupleType(),
+		Preconditions.checkArgument(getType().isTupleType(),
 				"The writeAsCsv() method can only be used on data sets of tuples.");
 		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
 				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
@@ -1103,7 +1104,7 @@ public class DataStream<OUT> {
 	 */
 	@SuppressWarnings("unchecked")
 	public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, long millis)
{
-		Validate.isTrue(getType().isTupleType(),
+		Preconditions.checkArgument(getType().isTupleType(),
 				"The writeAsCsv() method can only be used on data sets of tuples.");
 		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
 				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
@@ -1125,7 +1126,7 @@ public class DataStream<OUT> {
 	 */
 	@SuppressWarnings("unchecked")
 	public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, WriteMode
writeMode) {
-		Validate.isTrue(getType().isTupleType(),
+		Preconditions.checkArgument(getType().isTupleType(),
 				"The writeAsCsv() method can only be used on data sets of tuples.");
 		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
 				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
@@ -1154,7 +1155,7 @@ public class DataStream<OUT> {
 	@SuppressWarnings("unchecked")
 	public <X extends Tuple> DataStreamSink<OUT> writeAsCsv(String path, WriteMode
writeMode,
 			long millis) {
-		Validate.isTrue(getType().isTupleType(),
+		Preconditions.checkArgument(getType().isTupleType(),
 				"The writeAsCsv() method can only be used on data sets of tuples.");
 		CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
 				CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);

http://git-wip-us.apache.org/repos/asf/flink/blob/f950def5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2e33b82..c92bd49 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -59,6 +58,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 
 import com.esotericsoftware.kryo.Serializer;
+import com.google.common.base.Preconditions;
 
 /**
  * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -388,7 +388,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The DataStream representing the text file.
 	 */
 	public DataStreamSource<String> readTextFile(String filePath) {
-		Validate.notNull(filePath, "The file path may not be null.");
+		Preconditions.checkNotNull(filePath, "The file path may not be null.");
 		TextInputFormat format = new TextInputFormat(new Path(filePath));
 		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
 
@@ -405,7 +405,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The DataStream representing the text file.
 	 */
 	public DataStreamSource<String> readTextFile(String filePath, String charsetName)
{
-		Validate.notNull(filePath, "The file path may not be null.");
+		Preconditions.checkNotNull(filePath, "The file path may not be null.");
 		TextInputFormat format = new TextInputFormat(new Path(filePath));
 		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
 		format.setCharsetName(charsetName);

http://git-wip-us.apache.org/repos/asf/flink/blob/f950def5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index 699c38d..8d98e23 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -18,21 +18,20 @@
 
 package org.apache.flink.streaming.api.scala
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.flink.api.common.ExecutionConfig
-import scala.reflect.ClassTag
-import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.functions.CrossFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.scala.typeutils.CaseClassSerializer
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.CrossWindowFunction
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import java.util.concurrent.TimeUnit
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.functions.co.CrossWindowFunction
 import org.apache.flink.streaming.api.operators.co.CoStreamWindow
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+
+import scala.reflect.ClassTag
 
 class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
   TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
@@ -109,7 +108,7 @@ object StreamCrossOperator {
   private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2],
                                                        crossFunction: (I1, I2) => R):
   CrossWindowFunction[I1, I2, R] = {
-    Validate.notNull(crossFunction, "Join function must not be null.")
+    require(crossFunction != null, "Join function must not be null.")
 
     val crossFun = new CrossFunction[I1, I2, R] {
       val cleanFun = op.input1.clean(crossFunction)

http://git-wip-us.apache.org/repos/asf/flink/blob/f950def5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 15a8f1d..cbb5fb7 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -19,16 +19,15 @@
 package org.apache.flink.streaming.api.scala
 
 import com.esotericsoftware.kryo.Serializer
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
-
-import scala.reflect.ClassTag
-import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
-import org.apache.flink.streaming.api.functions.source.{ FromElementsFunction, SourceFunction
}
-import org.apache.flink.util.Collector
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
+import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
+import org.apache.flink.util.Collector
+
+import scala.reflect.ClassTag
 
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
@@ -266,7 +265,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def fromCollection[T: ClassTag: TypeInformation](
     data: Seq[T]): DataStream[T] = {
-    Validate.notNull(data, "Data must not be null.")
+    require(data != null, "Data must not be null.")
     val typeInfo = implicitly[TypeInformation[T]]
 
     val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
@@ -285,7 +284,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    */
   def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T]
= {
-    Validate.notNull(function, "Function must not be null.")
+    require(function != null, "Function must not be null.")
     val cleanFun = StreamExecutionEnvironment.clean(function)
     val typeInfo = implicitly[TypeInformation[T]]
     javaEnv.addSource(cleanFun).returns(typeInfo)
@@ -297,7 +296,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    */
   def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T]
= {
-    Validate.notNull(function, "Function must not be null.")
+    require(function != null, "Function must not be null.")
     val sourceFunction = new SourceFunction[T] {
       val cleanFun = StreamExecutionEnvironment.clean(function)
       override def run(out: Collector[T]) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f950def5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index 510aaae..def5679 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -18,25 +18,24 @@
 
 package org.apache.flink.streaming.api.scala
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.flink.api.common.ExecutionConfig
-import scala.Array.canBuildFrom
-import scala.reflect.ClassTag
-import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.functions.JoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.operators.Keys
-import org.apache.flink.api.scala.typeutils.CaseClassSerializer
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
+import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, SingleOutputStreamOperator}
 import org.apache.flink.streaming.api.functions.co.JoinWindowFunction
+import org.apache.flink.streaming.api.operators.co.CoStreamWindow
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
-import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import java.util.concurrent.TimeUnit
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
-import org.apache.flink.streaming.api.operators.co.CoStreamWindow
+
+import scala.Array.canBuildFrom
+import scala.reflect.ClassTag
 
 class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends 
 TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
@@ -209,7 +208,7 @@ object StreamJoinOperator {
 
   private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2],
     joinFunction: (I1, I2) => R) = {
-    Validate.notNull(joinFunction, "Join function must not be null.")
+    require(joinFunction != null, "Join function must not be null.")
 
     val joinFun = new JoinFunction[I1, I2, R] {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f950def5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
index eedee0e..84286c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.scala.windowing
 
 import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
-import org.apache.commons.lang.Validate
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction
 
@@ -36,7 +35,7 @@ object Delta {
    * the eviction stops.
    */
   def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T]
= {
-    Validate.notNull(deltaFunction, "Delta function must not be null")
+    require(deltaFunction != null, "Delta function must not be null")
     val df = new DeltaFunction[T] {
       val cleanFun = clean(deltaFunction)
       override def getDelta(first: T, second: T) = cleanFun(first, second)

http://git-wip-us.apache.org/repos/asf/flink/blob/f950def5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
index 9a69369..a935440 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
@@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime
}
 
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.helper.Timestamp
-import org.apache.commons.lang.Validate
 
 object Time {
 
@@ -43,7 +42,7 @@ object Time {
    *
    */
   def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R]
= {
-    Validate.notNull(timestamp, "Timestamp must not be null.")
+    require(timestamp != null, "Timestamp must not be null.")
     val ts = new Timestamp[R] {
       val fun = clean(timestamp, true)
       override def getTimestamp(in: R) = fun(in)


Mime
View raw message