Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F397D17766 for ; Thu, 12 Mar 2015 00:36:50 +0000 (UTC) Received: (qmail 32362 invoked by uid 500); 12 Mar 2015 00:36:50 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 32285 invoked by uid 500); 12 Mar 2015 00:36:50 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 32275 invoked by uid 99); 12 Mar 2015 00:36:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Mar 2015 00:36:50 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of fhueske@gmail.com designates 209.85.213.42 as permitted sender) Received: from [209.85.213.42] (HELO mail-yh0-f42.google.com) (209.85.213.42) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Mar 2015 00:36:11 +0000 Received: by yhzz6 with SMTP id z6so6513939yhz.11 for ; Wed, 11 Mar 2015 17:36:10 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=SQibJl2IxcgsgXnTh9PpKDneC35Zv08MBzijZSwstnM=; b=rxqyb9zzAKg+2bUi20dwkUAQ9hljy8mhNRLSyh8VR0hpTTCMt24o3mqKTf+TdbwO0Z MIJaml4YzxsD4TEOmCjz98YntqAh1UKhF1G/wd/WxT9Lvth0Jkqvz8IhQ6NrZpzvUFgh J+P4c+bFs8WU3/PL6DN89OWO6LqPAgINw4mKFlPPLPo5wU8LxbpB09xpMTVL+oQbo2lG TuFXRd2r14tuMlJT7IJp+CYlCX9y6HRGKiEjPBfwC2hOc3DWP4sIsRDAA+6Ry1gDuEti cud1hFoYxCnkTgL6BfvS+oDzZoxOheR/Qp8bqMApIstWmL+Pnx7kAqJd7/DWxsdikrSI NK0Q== MIME-Version: 1.0 X-Received: by 10.236.206.13 with SMTP id k13mr38470096yho.51.1426120569909; Wed, 11 Mar 2015 17:36:09 -0700 (PDT) Received: by 10.170.149.212 with HTTP; Wed, 11 Mar 2015 17:36:09 -0700 (PDT) In-Reply-To: References: Date: Thu, 12 Mar 2015 01:36:09 +0100 Message-ID: Subject: Re: Socket output stream From: Fabian Hueske To: user@flink.apache.org Content-Type: multipart/alternative; boundary=047d7b66fdb5f9f47205110c91c3 X-Virus-Checked: Checked by ClamAV on apache.org --047d7b66fdb5f9f47205110c91c3 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable It is in AbstractRichFunction [1]. RichSinkFunction extends AbstractRichFunction: public abstract class RichSinkFunction extends AbstractRichFunction implements SinkFunction Best, Fabian [1] https://github.com/apache/flink/blob/583c527fc3fc693dd40b908d969f1e510ff7df= b3/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractR= ichFunction.java 2015-03-12 1:28 GMT+01:00 Emmanuel : > I don't see an 'open()' function to override in the RichSinkFunction or > the SinkFunction... so where is this open() function supposed to be? > > > ------------------------------ > Date: Thu, 12 Mar 2015 01:17:34 +0100 > Subject: Re: Socket output stream > From: fhueske@gmail.com > To: user@flink.apache.org > > > Hi Emmanuel, > > the open() method should the right place for setting up the socket > connection. It is called on the worker node before the first input arrive= s. > > Best, Fabian > > 2015-03-12 1:05 GMT+01:00 Emmanuel : > > Hi Marton, > > Thanks for the info. > > I've been trying to implement a socket sink but running into 'Not > Serializable' kind of issues. > I was seeing in the Spark docs that this is typically an issue, where the > socket should be created on the worker node, as it can't be serialized to > be moved from the supervisor. > > http://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#desig= n-patterns-for-using-foreachrdd > > So, not sure how this would be implemented in Flink... > My attempt (maybe very naive) looked like this: > > public static final class SocketSink extends RichSinkFunction { > > private PrintWriter out; > > public SocketSink(String host, Integer port) throws IOException { > Socket clientSocket =3D new Socket(host,port); > out =3D new PrintWriter(clientSocket.getOutputStream(), true); > } > > @Override > public void invoke(String s) { > out.println(s); > } > } > > > maybe i should just move to Kafka directly... ;/ > > Thanks for help > > Emmanuel > > > > ------------------------------ > From: mbalassi@apache.org > Date: Wed, 11 Mar 2015 16:37:41 +0100 > Subject: Fwd: Flink questions > To: eleroy@msn.com > CC: rmetzger@apache.org; hsaputra@apache.org; user@flink.apache.org > > Dear Emmanuel, > > I'm Marton, one of the Flink Streaming developers - Robert forwarded your > issue to me. Thanks for trying out our project. > > 1) Debugging: TaskManager logs are currently not forwarded to the UI, but > you can find them on the taskmanager machines in the log folder of your > Flink distribution. We have this issue on our agenda in the very near > future - they need to be accessible from the UI. > > 2) Output to socket: Currently we do not have a preimplemented sink for > sockets (although we offer a socket source and sinks writing to Apache > Kafka, Flume and RabbitMQ). You can easily implement a socket sink by > extending the abstract RichSinkFunction class though. [1] > > For using that you can simply say dataStream.addSink(MySinkFunction()) - > in that you can bring up a socket or any other service. You would create = a > socket in the open function and then in the invoke method you would write > every value out to it. > > I do agree that this is a nice tool to have so I have opened a JIRA ticke= t > for it. [2] > > 3) Internal data format: Robert was kind enough to offer a more detailed > answer on this issue. In general streaming sinks support any file output > that is supported by batch Flink including Avro. You can use this > functionality by dataStream.addSink(new FileSinkFunction<>(OutputFormat))= . > > [1] > http://ci.apache.org/projects/flink/flink-docs-master/streaming_guide.htm= l#connecting-to-the-outside-world > [2] https://issues.apache.org/jira/browse/FLINK-1688 > > Best, > > Marton > > *From:* Emmanuel > *Date:* 11. M=C3=A4rz 2015 14:59:31 MEZ > *To:* Robert Metzger , Henry Saputra < > hsaputra@apache.org> > *Subject:* *Flink questions* > > Hello, > > > > Thanks again for the help yesterday: the simple things go a long way to > get me moving... > > I have more questions i hope I can get your opinion and input about: > > *Debugging:* > What's the preferred or recommended way to proceed? > I have been using some System.out.println() statements in my simple test > code, and the results are confusing: > First, in the UI, the logs are for the jobmanager.out, but there is never > anything there; wherever i see output in a log it's on the taskmanager.ou= t > file > Also, even more confusing is the fact that often times I just get no log > at all... the UI says the topology is running, but nothing get printed > out... > Is there a process you'd recommend to follow to debug properly with logs? > > *Output to socket* > Ideally I'd like to print out to a socket/stream and read from another > machine so as not to choke the node with disk I/Os when testing > performances. Not sure how to do that. > > *Internal Data format* > Finally, a practical question about data format: we ingest JSON, which is > not convenient, and uses a lot of space. Internally Java/Scala prefers > Tuples, and we were thinking of using ProtoBuffs. > There is also Avro that could do this as I understand it... What would be > the recommended way to format data internally? > > Thanks for your input. > > Cheers > Emmanuel > > > > > --047d7b66fdb5f9f47205110c91c3 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
It is in AbstractRichFunction [1].=C2=A0
RichSinkFunction extends AbstractRichFunction:
public abstract class RichSinkFunction<IN> = extends AbstractRichFunction implements SinkFunction<IN>
<= br>
Best, Fabian

<= br>
2015-03-12 1:28 GMT+01:00 Emmanuel <eleroy@msn= .com>:
I don't see an 'open()' function to overr= ide in the RichSinkFunction or the SinkFunction... so where is this open() = function supposed to be?



Date: Thu, 12 Mar 2015 01:17:= 34 +0100
Subject: Re: Socket output stream
From: fhueske@gmail.com
To: user@flink.apache.org


Hi Emmanuel,

the open() method should the right place for setting up the socket conne= ction. It is called on the worker node before the first input arrives.

Best, Fabian

2015-03-12 1:05 = GMT+01:00 Emmanuel <eleroy@msn.com>:
Hi Marton,

Thanks for the info.

I've been trying to implement a socket sink but = running into 'Not Serializable' kind of issues.
I was see= ing in the Spark docs that this is typically an issue, where the socket sho= uld be created on the worker node, as it can't be serialized to be move= d from the supervisor.

So, not sure how this would be implemented in Flink...
My attem= pt (maybe very naive) looked like this:

public static final class SocketSink extends =
RichSinkFunction<String> {

private PrintWriter out;

public SocketSink(String host, Integer port) throws IOException {
= Socket clientSocket =3D ne= w Socket(host,port);
out =3D ne= w PrintWriter(clientSocket.getOutputStream(), true);
}

@Override
= public void invoke(St= ring s) {
out
.println(s);
}
}

maybe i should just move to Kafka directly... ;/
=
Thanks for help
Emmanuel



From: mbalassi@apache.org
Dat= e: Wed, 11 Mar 2015 16:37:41 +0100
Subject: Fwd: Flink questions
To: = eleroy@msn.com
C= C: rmetzger@apache= .org; hsaputra= @apache.org; user@flink.apache.org

Dear Emmanuel,

I'm Marton, one of the Flink Streaming developers = - Robert forwarded your issue to me. Thanks for trying out our project.

1) Debugging: TaskManager logs are currently not forw= arded to the UI, but you can find them on the taskmanager machines in the l= og folder of your Flink distribution. We have this issue on our agenda in t= he very near future - they need to be accessible from the UI.
2) Output to socket: Currently we do not have a preimplemented = sink for sockets (although we offer a socket source and sinks writing to Ap= ache Kafka, Flume and RabbitMQ). You can easily implement a socket sink by = extending the abstract RichSinkFunction class though. [1]

For using that you can simply say dataStream.addSink(MySinkFunction= ()) - in that you can bring up a socket or any other service. You would cre= ate a socket in the open function and then in the invoke method you would w= rite every value out to it.

I do agree that this i= s a nice tool to have so I have opened a JIRA ticket for it. [2]
=
3) Internal data format: Robert was kind enough to offer a m= ore detailed answer on this issue. In general streaming sinks support any f= ile output that is supported by batch Flink including Avro. You can use thi= s functionality by dataStream.addSink(new FileSinkFunction<>(OutputFo= rmat)).

=
Best,

Marton
From: Emmanuel <eleroy@msn.com>
Date: 11. M=C3= =A4rz 2015 14:59:31 MEZ
To: Robert Metzger <rmetzger@apache.org>, Henry S= aputra <hsaputr= a@apache.org>
Subject: Flink questions

Hello,


Thanks aga= in for the help yesterday: the simple things go a long way to get me moving= ...

I have more questions i hope I can get your op= inion and input about:

Debugging:
What's the preferred or recommended way to proceed?=C2=A0
I = have been using some System.out.println() statements in my simple test code= , and the results are confusing:
First, in the UI, the logs are f= or the jobmanager.out, but there is never anything there; wherever i see ou= tput in a log it's on the taskmanager.out file
Also, even mor= e confusing is the fact that often times I just get no log at all... the UI= says the topology is running, but nothing get printed out...
Is = there a process you'd recommend to follow to debug properly with logs?= =C2=A0

Output to socket
Ideally I= 'd like to print out to a socket/stream and read from another machine s= o as not to choke the node with disk I/Os when testing performances. Not su= re how to do that.

Internal Data format
Finally, a practical question about data format: we ingest JSON, whi= ch is not convenient, and uses a lot of space. Internally Java/Scala prefer= s Tuples, and we were thinking of using ProtoBuffs.=C2=A0
There i= s also Avro that could do this as I understand it... What would be the reco= mmended way to format data internally?

Thanks for = your input.

Cheers
Emmanuel
=




--047d7b66fdb5f9f47205110c91c3--