flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [04/51] [abbrv] git commit: [streaming] API javadoc + StreamRecordSerializer update
Date Mon, 18 Aug 2014 17:25:41 GMT
[streaming] API javadoc + StreamRecordSerializer update


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5dbf8152
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5dbf8152
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5dbf8152

Branch: refs/heads/master
Commit: 5dbf81521d6c8b5b7937e041a9f9e8b0327dc345
Parents: 11f62c1
Author: gyfora <gyula.fora@gmail.com>
Authored: Mon Jul 21 12:49:56 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 18 16:14:13 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/DataStream.java    | 18 ++++++++++++++----
 .../api/streamcomponent/StreamIterationSink.java  |  2 +-
 .../streaming/api/streamrecord/StreamRecord.java  |  4 ++--
 .../api/streamrecord/StreamRecordSerializer.java  |  3 +--
 .../org/apache/flink/streaming/api/PrintTest.java |  2 +-
 5 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5dbf8152/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index d327fad..e67ac0e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -129,9 +129,19 @@ public class DataStream<T extends Tuple> {
 	public String getId() {
 		return id;
 	}
-	
-	public DataStream<T> setMutability(boolean isMutable){
-		environment.setMutability(this,isMutable);
+
+	/**
+	 * Sets the mutability of the operator represented by the DataStream. If the
+	 * operator is set to mutable, the tuples received in the user defined
+	 * functions, will be reused after the function call. Setting an operator to
+	 * mutable greatly reduces garbage collection overhead and thus scalability.
+	 * 
+	 * @param isMutable
+	 *            The mutability of the operator.
+	 * @return The DataStream with mutability set.
+	 */
+	public DataStream<T> setMutability(boolean isMutable) {
+		environment.setMutability(this, isMutable);
 		return this;
 	}
 
@@ -400,7 +410,7 @@ public class DataStream<T extends Tuple> {
 	public DataStream<T> print() {
 		return environment.print(new DataStream<T>(this));
 	}
-	
+
 	/**
 	 * Writes a DataStream to the file specified by path in text format. For
 	 * every element of the DataStream the result of {@link Object#toString()}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5dbf8152/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index b9d44c3..b92e031 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -79,7 +79,7 @@ public class StreamIterationSink<IN extends Tuple> extends AbstractStreamCompone
 		while ((reuse = inputIter.next(reuse)) != null) {
 			pushToQueue(reuse);
 			// TODO: Fix object reuse for iteration
-			reuse = inTupleSerializer.createInstance().setId(0);
+			reuse = inTupleSerializer.createInstance();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5dbf8152/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
index 5021013..9de2f38 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
@@ -31,13 +31,13 @@ import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 public class StreamRecord<T extends Tuple> implements Serializable {
 	private static final long serialVersionUID = 1L;
 
-	protected UID uid;
+	private UID uid;
 	private T tuple;
 
 	protected TupleSerializer<T> tupleSerializer;
 
 	public StreamRecord() {
-
+		uid = new UID();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5dbf8152/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index b367333..66c6751 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -50,9 +50,8 @@ public final class StreamRecordSerializer<T extends Tuple> extends
TypeSerialize
 	public StreamRecord<T> createInstance() {
 		try {
 			@SuppressWarnings("unchecked")
-			StreamRecord<T> t = StreamRecord.class.newInstance().setId(0);
+			StreamRecord<T> t = StreamRecord.class.newInstance();
 			t.setTuple(tupleSerializer.createInstance());
-
 			return t;
 		} catch (Exception e) {
 			throw new RuntimeException("Cannot instantiate StreamRecord.", e);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5dbf8152/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index c70cca3..1c43a66 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -78,7 +78,7 @@ public class PrintTest {
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		env.generateSequence(1, 10).print();
-		env.execute();
+		env.executeTest(MEMORYSIZE);
 
 	}
 


Mime
View raw message