Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 92E6299AF for ; Wed, 17 Dec 2014 12:31:15 +0000 (UTC) Received: (qmail 66523 invoked by uid 500); 17 Dec 2014 12:31:15 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 66493 invoked by uid 500); 17 Dec 2014 12:31:15 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 66483 invoked by uid 99); 17 Dec 2014 12:31:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Dec 2014 12:31:15 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 17 Dec 2014 12:31:13 +0000 Received: (qmail 54966 invoked by uid 99); 17 Dec 2014 12:29:38 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Dec 2014 12:29:38 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 27E4B82E344; Wed, 17 Dec 2014 12:22:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.incubator.apache.org Date: Wed, 17 Dec 2014 12:22:29 -0000 Message-Id: <1bb05e7b8ab2483886aab107f459f367@git.apache.org> In-Reply-To: <162e281eb9c84534bb631e66d1b7468a@git.apache.org> References: <162e281eb9c84534bb631e66d1b7468a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-flink git commit: [streaming] DataStream print functionality update X-Virus-Checked: Checked by ClamAV on apache.org [streaming] DataStream print functionality update PrintSinkFunction now explicitly states threads in output Added printToErr functionality Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b5ac6ec7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b5ac6ec7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b5ac6ec7 Branch: refs/heads/master Commit: b5ac6ec7d97edcd224ac990c1f0314bc8acdfa9e Parents: 61b023f Author: mbalassi Authored: Tue Dec 16 12:48:09 2014 +0100 Committer: mbalassi Committed: Tue Dec 16 23:39:05 2014 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/DataStream.java | 17 ++- .../api/function/sink/PrintSinkFunction.java | 128 ++++++++++++++----- .../streamvertex/StreamingRuntimeContext.java | 2 +- 3 files changed, 112 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b5ac6ec7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 978f5fa..3fc685a 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -780,7 +780,7 @@ public class DataStream { } /** - * Writes a DataStream to the standard output stream (stdout). For each + * Writes a DataStream to the standard output stream (stdout).
For each * element of the DataStream the result of {@link Object#toString()} is * written. * @@ -793,6 +793,21 @@ public class DataStream { return returnStream; } + + /** + * Writes a DataStream to the standard output stream (stderr).
For each + * element of the DataStream the result of {@link Object#toString()} is + * written. + * + * @return The closed DataStream. + */ + public DataStreamSink printToErr() { + DataStream inputStream = this.copy(); + PrintSinkFunction printFunction = new PrintSinkFunction(true); + DataStreamSink returnStream = addSink(inputStream, printFunction, getType()); + + return returnStream; + } /** * Writes a DataStream to the file specified by path in text format. For http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b5ac6ec7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java index fc75da7..d460749 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java @@ -1,36 +1,98 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. - */ - -package org.apache.flink.streaming.api.function.sink; - - -/** - * Dummy implementation of the SinkFunction writing every tuple to the standard - * output. Used for print. - * - * @param - * Input tuple type - */ -public class PrintSinkFunction implements SinkFunction { - private static final long serialVersionUID = 1L; - - @Override - public void invoke(IN tuple) { - System.out.println(tuple); - } - + */ + +package org.apache.flink.streaming.api.function.sink; + +import java.io.PrintStream; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; + +/** + * Implementation of the SinkFunction writing every tuple to the standard + * output or standard error stream. + * + * @param + * Input record type + */ +public class PrintSinkFunction extends RichSinkFunction { + private static final long serialVersionUID = 1L; + + private static final boolean STD_OUT = false; + private static final boolean STD_ERR = true; + + private boolean target; + private transient PrintStream stream; + private transient String prefix; + + /** + * Instantiates a print sink function that prints to standard out. + */ + public PrintSinkFunction() {} + + /** + * Instantiates a print sink function that prints to standard out. + * + * @param stdErr True, if the format should print to standard error instead of standard out. + */ + public PrintSinkFunction(boolean stdErr) { + target = stdErr; + } + + public void setTargetToStandardOut() { + target = STD_OUT; + } + + public void setTargetToStandardErr() { + target = STD_ERR; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + // get the target stream + stream = target == STD_OUT ? System.out : System.err; + + // set the prefix if we have a >1 DOP + prefix = (context.getNumberOfParallelSubtasks() > 1) ? + ((context.getIndexOfThisSubtask() + 1) + "> ") : null; + } + + @Override + public void invoke(IN record) { + if (prefix != null) { + stream.println(prefix + record.toString()); + } + else { + stream.println(record.toString()); + } + } + + @Override + public void close() throws Exception { + this.stream = null; + this.prefix = null; + super.close(); + } + + @Override + public String toString() { + return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); + } + } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b5ac6ec7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java index 49cf15f..798724e 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java @@ -34,7 +34,7 @@ import org.apache.flink.streaming.state.OperatorState; */ public class StreamingRuntimeContext extends RuntimeUDFContext { - private Environment env; + public Environment env; private final Map> operatorStates; public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,