flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [02/25] flink git commit: [hotfix] Make DataStream property methods properly Scalaesk
Date Fri, 26 Feb 2016 19:58:48 GMT
[hotfix] Make DataStream property methods properly Scalaesk

This also includes some minor cleanups

This closes #1689


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3c6646e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3c6646e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3c6646e

Branch: refs/heads/release-1.0
Commit: f3c6646e68750a068b3325181b8a16a4689a0fed
Parents: df19a8b
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 22 18:37:59 2016 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Fri Feb 26 20:56:24 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |   2 +-
 .../flink/streaming/api/graph/StreamNode.java   |   9 +-
 .../api/graph/StreamGraphGeneratorTest.java     |   7 +-
 .../flink/streaming/api/scala/DataStream.scala  | 108 ++++++++++++++-----
 4 files changed, 91 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3c6646e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index ddba7d6..1cd8ade 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -128,7 +128,7 @@ public class DataStream<T> {
 	 * @return ID of the DataStream
 	 */
 	@Internal
-	public Integer getId() {
+	public int getId() {
 		return transformation.getId();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f3c6646e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 8605ce0..533f1e1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -40,7 +40,7 @@ public class StreamNode implements Serializable {
 
 	transient private StreamExecutionEnvironment env;
 
-	private final Integer id;
+	private final int id;
 	private Integer parallelism = null;
 	private Long bufferTimeout = null;
 	private final String operatorName;
@@ -124,7 +124,7 @@ public class StreamNode implements Serializable {
 		return inEdgeIndices;
 	}
 
-	public Integer getId() {
+	public int getId() {
 		return id;
 	}
 
@@ -264,12 +264,11 @@ public class StreamNode implements Serializable {
 		}
 
 		StreamNode that = (StreamNode) o;
-
-		return id.equals(that.id);
+		return id == that.id;
 	}
 
 	@Override
 	public int hashCode() {
-		return id.hashCode();
+		return id;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3c6646e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 734199b..d1f92c6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -39,7 +40,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.util.EvenOddOutputSelector;
 import org.apache.flink.streaming.util.NoOpIntMap;
 import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/flink/blob/f3c6646e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3522b51..04a8a5f 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{PublicEvolving, Public}
+import org.apache.flink.annotation.{Internal, PublicEvolving, Public}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction,
Partitioner}
 import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -43,30 +44,79 @@ import scala.collection.JavaConverters._
 class DataStream[T](stream: JavaStream[T]) {
 
   /**
-   * Gets the underlying java DataStream object.
-   */
-  def javaStream: JavaStream[T] = stream
-
-  /**
    * Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]].
    *
-   * @return associated execution environment
+   * @return associated execution environment 
+   * @deprecated Use [[executionEnvironment]] instead
    */
+  @deprecated
+  @PublicEvolving
   def getExecutionEnvironment: StreamExecutionEnvironment =
     new StreamExecutionEnvironment(stream.getExecutionEnvironment)
 
   /**
-   * Returns the ID of the DataStream.
-   *
-   * @return ID of the DataStream
+   * Returns the TypeInformation for the elements of this DataStream.
+   * 
+   * @deprecated Use [[dataType]] instead.
    */
+  @deprecated
   @PublicEvolving
-  def getId = stream.getId
+  def getType(): TypeInformation[T] = stream.getType()
 
   /**
+   * Returns the parallelism of this operation.
+   * 
+   * @deprecated Use [[parallelism]] instead.
+   */
+  @deprecated
+  @PublicEvolving
+  def getParallelism = stream.getParallelism
+
+  /**
+   * Returns the execution config.
+   * 
+   * @deprecated Use [[executionConfig]] instead.
+   */
+  @deprecated
+  @PublicEvolving
+  def getExecutionConfig = stream.getExecutionConfig
+
+  /**
+   * Returns the ID of the DataStream.
+   */
+  @Internal
+  private[flink] def getId = stream.getId()
+  
+  // --------------------------------------------------------------------------
+  //  Scalaesk accessors 
+  // --------------------------------------------------------------------------
+  
+  /**
+   * Gets the underlying java DataStream object.
+   */
+  def javaStream: JavaStream[T] = stream
+  
+  /**
    * Returns the TypeInformation for the elements of this DataStream.
    */
-  def getType(): TypeInformation[T] = stream.getType()
+  def dataType: TypeInformation[T] = stream.getType()
+
+  /**
+   * Returns the execution config.
+   */
+  def executionConfig: ExecutionConfig = stream.getExecutionConfig()
+
+  /**
+   * Returns the [[StreamExecutionEnvironment]] associated with this data stream
+   */
+  def executionEnvironment: StreamExecutionEnvironment =
+    new StreamExecutionEnvironment(stream.getExecutionEnvironment())
+  
+  
+  /**
+   * Returns the parallelism of this operation.
+   */
+  def parallelism: Int = stream.getParallelism()
 
   /**
    * Sets the parallelism of this operation. This must be at least 1.
@@ -75,34 +125,36 @@ class DataStream[T](stream: JavaStream[T]) {
     stream match {
       case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(parallelism)
       case _ =>
-        throw new UnsupportedOperationException("Operator " + stream.toString +  " cannot
" +
-          "have " +
-          "parallelism.")
+        throw new UnsupportedOperationException(
+          "Operator " + stream + " cannot set the parallelism.")
     }
     this
   }
 
   /**
-   * Returns the parallelism of this operation.
-   */
-  def getParallelism = stream.getParallelism
-
-  /**
-   * Returns the execution config.
-   */
-  def getExecutionConfig = stream.getExecutionConfig
-
-  /**
    * Gets the name of the current data stream. This name is
    * used by the visualization and logging during runtime.
    *
    * @return Name of the stream.
    */
-  def getName : String = stream match {
+  def name: String = stream match {
     case stream : SingleOutputStreamOperator[T,_] => stream.getName
     case _ => throw new
         UnsupportedOperationException("Only supported for operators.")
   }
+  
+  // --------------------------------------------------------------------------
+  
+  /**
+   * Gets the name of the current data stream. This name is
+   * used by the visualization and logging during runtime.
+   *
+   * @return Name of the stream.
+   * @deprecated Use [[name]] instead
+   */
+  @deprecated
+  @PublicEvolving
+  def getName : String = name
 
   /**
    * Sets the name of the current data stream. This name is
@@ -209,6 +261,10 @@ class DataStream[T](stream: JavaStream[T]) {
     this
   }
 
+  // --------------------------------------------------------------------------
+  //  Stream Transformations 
+  // --------------------------------------------------------------------------
+  
   /**
    * Creates a new DataStream by merging DataStream outputs of
    * the same type with each other. The DataStreams merged using this operator


Mime
View raw message