flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Socket output stream
Date Thu, 12 Mar 2015 00:36:09 GMT
It is in AbstractRichFunction [1].

RichSinkFunction extends AbstractRichFunction:
public abstract class RichSinkFunction<IN> extends AbstractRichFunction
implements SinkFunction<IN>

Best, Fabian

[1]
https://github.com/apache/flink/blob/583c527fc3fc693dd40b908d969f1e510ff7dfb3/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java

2015-03-12 1:28 GMT+01:00 Emmanuel <eleroy@msn.com>:

> I don't see an 'open()' function to override in the RichSinkFunction or
> the SinkFunction... so where is this open() function supposed to be?
>
>
> ------------------------------
> Date: Thu, 12 Mar 2015 01:17:34 +0100
> Subject: Re: Socket output stream
> From: fhueske@gmail.com
> To: user@flink.apache.org
>
>
> Hi Emmanuel,
>
> the open() method should the right place for setting up the socket
> connection. It is called on the worker node before the first input arrives.
>
> Best, Fabian
>
> 2015-03-12 1:05 GMT+01:00 Emmanuel <eleroy@msn.com>:
>
> Hi Marton,
>
> Thanks for the info.
>
> I've been trying to implement a socket sink but running into 'Not
> Serializable' kind of issues.
> I was seeing in the Spark docs that this is typically an issue, where the
> socket should be created on the worker node, as it can't be serialized to
> be moved from the supervisor.
>
> http://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>
> So, not sure how this would be implemented in Flink...
> My attempt (maybe very naive) looked like this:
>
> public static final class SocketSink extends RichSinkFunction<String> {
>
>     private PrintWriter out;
>
>     public SocketSink(String host, Integer port) throws IOException {
>         Socket clientSocket = new Socket(host,port);
>         out = new PrintWriter(clientSocket.getOutputStream(), true);
>     }
>
>     @Override
>     public void invoke(String s) {
>         out.println(s);
>     }
> }
>
>
> maybe i should just move to Kafka directly... ;/
>
> Thanks for help
>
> Emmanuel
>
>
>
> ------------------------------
> From: mbalassi@apache.org
> Date: Wed, 11 Mar 2015 16:37:41 +0100
> Subject: Fwd: Flink questions
> To: eleroy@msn.com
> CC: rmetzger@apache.org; hsaputra@apache.org; user@flink.apache.org
>
> Dear Emmanuel,
>
> I'm Marton, one of the Flink Streaming developers - Robert forwarded your
> issue to me. Thanks for trying out our project.
>
> 1) Debugging: TaskManager logs are currently not forwarded to the UI, but
> you can find them on the taskmanager machines in the log folder of your
> Flink distribution. We have this issue on our agenda in the very near
> future - they need to be accessible from the UI.
>
> 2) Output to socket: Currently we do not have a preimplemented sink for
> sockets (although we offer a socket source and sinks writing to Apache
> Kafka, Flume and RabbitMQ). You can easily implement a socket sink by
> extending the abstract RichSinkFunction class though. [1]
>
> For using that you can simply say dataStream.addSink(MySinkFunction()) -
> in that you can bring up a socket or any other service. You would create a
> socket in the open function and then in the invoke method you would write
> every value out to it.
>
> I do agree that this is a nice tool to have so I have opened a JIRA ticket
> for it. [2]
>
> 3) Internal data format: Robert was kind enough to offer a more detailed
> answer on this issue. In general streaming sinks support any file output
> that is supported by batch Flink including Avro. You can use this
> functionality by dataStream.addSink(new FileSinkFunction<>(OutputFormat)).
>
> [1]
> http://ci.apache.org/projects/flink/flink-docs-master/streaming_guide.html#connecting-to-the-outside-world
> [2] https://issues.apache.org/jira/browse/FLINK-1688
>
> Best,
>
> Marton
>
> *From:* Emmanuel <eleroy@msn.com>
> *Date:* 11. März 2015 14:59:31 MEZ
> *To:* Robert Metzger <rmetzger@apache.org>, Henry Saputra <
> hsaputra@apache.org>
> *Subject:* *Flink questions*
>
> Hello,
>
>
>
> Thanks again for the help yesterday: the simple things go a long way to
> get me moving...
>
> I have more questions i hope I can get your opinion and input about:
>
> *Debugging:*
> What's the preferred or recommended way to proceed?
> I have been using some System.out.println() statements in my simple test
> code, and the results are confusing:
> First, in the UI, the logs are for the jobmanager.out, but there is never
> anything there; wherever i see output in a log it's on the taskmanager.out
> file
> Also, even more confusing is the fact that often times I just get no log
> at all... the UI says the topology is running, but nothing get printed
> out...
> Is there a process you'd recommend to follow to debug properly with logs?
>
> *Output to socket*
> Ideally I'd like to print out to a socket/stream and read from another
> machine so as not to choke the node with disk I/Os when testing
> performances. Not sure how to do that.
>
> *Internal Data format*
> Finally, a practical question about data format: we ingest JSON, which is
> not convenient, and uses a lot of space. Internally Java/Scala prefers
> Tuples, and we were thinking of using ProtoBuffs.
> There is also Avro that could do this as I understand it... What would be
> the recommended way to format data internally?
>
> Thanks for your input.
>
> Cheers
> Emmanuel
>
>
>
>
>

Mime
View raw message