flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-2070] [core] Deprecate "print(prefix)" methods and add "printOnTaskManager(prefix)" method.
Date Thu, 04 Jun 2015 00:15:23 GMT
Repository: flink
Updated Branches:
  refs/heads/master fad979532 -> 11643c0cc


[FLINK-2070] [core] Deprecate "print(prefix)" methods and add "printOnTaskManager(prefix)"
method.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11643c0c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11643c0c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11643c0c

Branch: refs/heads/master
Commit: 11643c0cc79eabe02e952e6fbd56d7a55166b623
Parents: fad9795
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Jun 3 22:52:00 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jun 4 00:10:15 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java | 69 ++++++++++++++------
 .../org/apache/flink/api/scala/DataSet.scala    | 26 ++++++++
 2 files changed, 76 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/11643c0c/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 157c666..f1eed3e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -377,18 +377,15 @@ public abstract class DataSet<T> {
 	}
 
 	/**
-	 * Convenience method to get the count (number of elements) of a DataSet
+	 * Convenience method to get the count (number of elements) of a DataSet.
 	 *
-	 * @return A long integer that represents the number of elements in the set
-	 *
-	 * @see org.apache.flink.api.java.Utils.CountHelper
+	 * @return A long integer that represents the number of elements in the data set.
 	 */
 	public long count() throws Exception {
-
 		final String id = new AbstractID().toString();
 
-		flatMap(new Utils.CountHelper<T>(id)).output(
-				new DiscardingOutputFormat<Long>());
+		flatMap(new Utils.CountHelper<T>(id)).name("count()")
+				.output(new DiscardingOutputFormat<Long>()).name("count() sink");
 
 		JobExecutionResult res = getExecutionEnvironment().execute();
 		return res.<Long> getAccumulatorResult(id);
@@ -396,18 +393,17 @@ public abstract class DataSet<T> {
 
 
 	/**
-	 * Convenience method to get the elements of a DataSet as a List
+	 * Convenience method to get the elements of a DataSet as a List.
 	 * As DataSet can contain a lot of data, this method should be used with caution.
 	 *
 	 * @return A List containing the elements of the DataSet
-	 *
-	 * @see org.apache.flink.api.java.Utils.CollectHelper
 	 */
 	public List<T> collect() throws Exception {
 		final String id = new AbstractID().toString();
 		final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());
 		
-		this.flatMap(new Utils.CollectHelper<T>(id, serializer)).output(new DiscardingOutputFormat<T>());
+		this.flatMap(new Utils.CollectHelper<T>(id, serializer)).name("collect()")
+				.output(new DiscardingOutputFormat<T>()).name("collect() sink");
 		JobExecutionResult res = getExecutionEnvironment().execute();
 
 		ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
@@ -1341,9 +1337,12 @@ public abstract class DataSet<T> {
 	 * 
 	 * <p>This method immediately triggers the program execution, similar to the
 	 * {@link #collect()} and {@link #count()} methods.</p>
+	 * 
+	 * @see #printToErr()
+	 * @see #printOnTaskManager(String)
 	 */
 	public void print() throws Exception {
-		List<T> elements = this.collect();
+		List<T> elements = collect();
 		for (T e: elements) {
 			System.out.println(e);
 		}
@@ -1358,32 +1357,64 @@ public abstract class DataSet<T> {
 	 *
 	 * <p>This method immediately triggers the program execution, similar to the
 	 * {@link #collect()} and {@link #count()} methods.</p>
+	 * 
+	 * @see #print()
+	 * @see #printOnTaskManager(String)
 	 */
 	public void printToErr() throws Exception {
-		List<T> elements = this.collect();
+		List<T> elements = collect();
 		for (T e: elements) {
 			System.err.println(e);
 		}
 	}
+
+	/**
+	 * Writes a DataSet to the standard output streams (stdout) of the TaskManagers that execute
+	 * the program (or more specifically, the data sink operators). On a typical cluster setup,
the
+	 * data will appear in the TaskManagers' <i>.out</i> files.
+	 * 
+	 * <p>To print the data to the console or stdout stream of the client process instead,
use the
+	 * {@link #print()} method.</p>
+	 * 
+	 * <p>For each element of the DataSet the result of {@link Object#toString()} is written.</p>
+	 *
+	 * @param prefix The string to prefix each line of the output with. This helps identifying
outputs
+	 *               from different printing sinks.   
+	 * @return The DataSink operator that writes the DataSet.
+	 *  
+	 * @see #print()
+	 */
+	public DataSink<T> printOnTaskManager(String prefix) {
+		return output(new PrintingOutputFormat<T>(prefix, false));
+	}
 	
 	/**
-	 * Writes a DataSet to the standard output stream (stdout).<br/>
-	 * For each element of the DataSet the result of {@link Object#toString()} is written.
+	 * Writes a DataSet to the standard output stream (stdout).
+	 * 
+	 * <p>For each element of the DataSet the result of {@link Object#toString()} is written.</p>
 	 *
-	 *  @param sinkIdentifier The string to prefix the output with.
-	 *  @return The DataSink that writes the DataSet.
+	 * @param sinkIdentifier The string to prefix the output with.
+	 * @return The DataSink that writes the DataSet.
+	 * 
+	 * @deprecated Use {@link #printOnTaskManager(String)} instead.
 	 */
+	@Deprecated
 	public DataSink<T> print(String sinkIdentifier) {
 		return output(new PrintingOutputFormat<T>(sinkIdentifier, false));
 	}
 
 	/**
-	 * Writes a DataSet to the standard error stream (stderr).<br/>
-	 * For each element of the DataSet the result of {@link Object#toString()} is written.
+	 * Writes a DataSet to the standard error stream (stderr).
+	 * 
+	 * <p>For each element of the DataSet the result of {@link Object#toString()} is written.</p>
 	 *
 	 * @param sinkIdentifier The string to prefix the output with.
 	 * @return The DataSink that writes the DataSet.
+	 * 
+	 * @deprecated Use {@link #printOnTaskManager(String)} instead, othe 
+	 *             {@link PrintingOutputFormat} instead.
 	 */
+	@Deprecated
 	public DataSink<T> printToErr(String sinkIdentifier) {
 		return output(new PrintingOutputFormat<T>(sinkIdentifier, true));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/11643c0c/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index e283e95..2ade2bc 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1349,13 +1349,35 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def printToErr(): Unit = {
     javaSet.printToErr()
   }
+
+  /**
+   * Writes a DataSet to the standard output streams (stdout) of the TaskManagers that execute
+   * the program (or more specifically, the data sink operators). On a typical cluster setup,
the
+   * data will appear in the TaskManagers' <i>.out</i> files.
+   *
+   * To print the data to the console or stdout stream of the client process instead, use
the
+   * [[print()]] method.
+   *
+   * For each element of the DataSet the result of [[AnyRef.toString()]] is written.
+   *
+   * @param prefix The string to prefix each line of the output with. This helps identifying
outputs
+   *               from different printing sinks.   
+   * @return The DataSink operator that writes the DataSet.
+   */
+  def printOnTaskManager(prefix: String): DataSink[T] = {
+    javaSet.printOnTaskManager(prefix)
+  }
   
   /**
    * *
    * Writes a DataSet to the standard output stream (stdout) with a sink identifier prefixed.
    * This uses [[AnyRef.toString]] on each element.
    * @param sinkIdentifier The string to prefix the output with.
+   * 
+   * @deprecated Use [[printOnTaskManager(String)]] instead.
    */
+  @Deprecated
+  @deprecated
   def print(sinkIdentifier: String): DataSink[T] = {
     output(new PrintingOutputFormat[T](sinkIdentifier, false))
   }
@@ -1364,7 +1386,11 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * Writes a DataSet to the standard error stream (stderr) with a sink identifier prefixed.
    * This uses [[AnyRef.toString]] on each element.
    * @param sinkIdentifier The string to prefix the output with.
+   * 
+   * @deprecated Use [[printOnTaskManager(String)]] instead.
    */
+  @Deprecated
+  @deprecated
   def printToErr(sinkIdentifier: String): DataSink[T] = {
       output(new PrintingOutputFormat[T](sinkIdentifier, true))
   }


Mime
View raw message