flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: How to use static data with streams?
Date Thu, 05 Nov 2015 14:47:51 GMT
Hi Ali,

great, the start-local-streaming.sh script sounds right.

I can explain why your first approach didn't work:

You were trying to send the CSV files from the Flink client to the cluster
using our RPC system (Akka). When you submit a job to Flink, we serialize
all the objects the user created (mappers, sources, ...) and send it to the
cluster.
There is a method StreamExecutionEnvironment.fromElements(..) which allows
users to serialize a few objects along with the job submission. But the
amount of data you can transfer like this is limited by the Akka frame
size. In our case I think the default is 10 megabytes.
After that, Akka will probably just drop or reject the deployment message.

I'm pretty sure the approach I've suggested will resolve the issue.

Please let me know if you need further assistance.

Regards,
Robert



On Thu, Nov 5, 2015 at 3:39 PM, Kashmar, Ali <Ali.Kashmar@emc.com> wrote:

> I did not load the CSV file using the approach you suggested. I was
> loading it outside the operators (at the beginning of the main method of
> my class), since the file will be needed by multiple operators for sure.
> When the file was small, I saw the job registered and started, but when I
> used a big CSV file, the job never got registered with the task manager (I
> tried the ‘list' command and got nothing).
>
> Here’s what I saw with the small(ish) file:
>
> # flink run analytics-flink.jar 19001 minisubs.csv output.csv
> loaded 200000 subscribers from csv file
> 11/02/2015 16:36:59 Job execution switched to status RUNNING.
> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
> Sink(1/1) switched to SCHEDULED
> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
> Sink(1/1) switched to DEPLOYING
> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
> Sink(1/1) switched to RUNNING
>
>
> And here’s what I saw with the big file:
>
> # flink run analytics-flink.jar 19001 subs.csv output.csv
> loaded 1173547 subscribers from csv file
>
>
> I’m already using the streaming mode. I’m running a single Flink node
> right now on Centos 7 using the ‘start-local-streaming.sh’ script.
>
> Thanks,
> Ali
>
> On 2015-11-05, 10:22 AM, "Robert Metzger" <rmetzger@apache.org> wrote:
>
> >Okay.
> >
> >you should be able to implement it as you described initially. I would do
> >the transformation in a map() operator of Flink. The RichMapFunction
> >provides you with an open() method which is called before the first record
> >arrives.
> >In the open() method, I would read the csv file(s) from HDFS or another
> >file system accessible by all nodes.
> >
> >Then, you can access the data from the files in the map operator.
> >
> >In order to utilize the memory best, I would recommend to start Flink in
> >the "streaming" mode. (-st argument on YARN). With that enabled, we
> >provide
> >more memory to streaming operators.
> >Also, I would only expose one processing slot per TaskManager, this way we
> >ensure that the files are only read once per TaskManager. (make sure you
> >have only one TaskManager per machine).
> >
> >Why did your previous approach fail? Do you still have the error message?
> >
> >Regards,
> >Robert
> >
> >On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <Ali.Kashmar@emc.com> wrote:
> >
> >> Hi Robert,
> >>
> >> The CSV file (or files as there will definitely be more than one) can be
> >> large (let¹s say 1 GB). Memory is not an issue though. Each node has at
> >> least 64 GB RAM mounted. The CSV files should easily fit in the memory
> >>of
> >> each node.
> >>
> >> Regards,
> >> Ali
> >>
> >>
> >>
> >> On 2015-11-05, 6:30 AM, "Robert Metzger" <rmetzger@apache.org> wrote:
> >>
> >> >Hi Ali,
> >> >
> >> >I'm excited to hear that EMC is looking into Apache Flink. I think the
> >> >solution to this problem depends on one question: What is the size of
> >>the
> >> >data in the CSV file compared to the memory you have available in the
> >> >cluster?
> >> >Would the mapping table from the file fit into the memory of all nodes
> >> >running Flink?
> >> >
> >> >Regards,
> >> >Robert
> >> >
> >> >PS: Did you subscribe to the mailing list? I've CCed you in case you're
> >> >not
> >> >subscribed yet
> >> >
> >> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <Ali.Kashmar@emc.com>
> >>wrote:
> >> >
> >> >> Hi there,
> >> >>
> >> >> I¹m trying to design and implement a use case in Flink where I¹m
> >> >>receiving
> >> >> protocol packets over a socket. Each packet has the subscriber IMSI
> >>in
> >> >>it
> >> >> and a bunch of more data. At the same time, I have a csv file with
a
> >> >> mapping from IMSI -> subscriber group. I need to inject the group
> >>into
> >> >> packet and then send it to the sink.
> >> >>
> >> >> I¹ve tried loading the CSV into a memory map and then accessing the
> >>map
> >> >> from within the Flink operators but that only works when the CSV is
> >>very
> >> >> small (a few hundred subscribers). I¹ve tried creating another stream
> >> >>for
> >> >> the CSV and connecting the streams but that doesn¹t yield anything
> >>as I
> >> >> can¹t have access to objects from both streams at the same time.
> >> >>
> >> >> How would you guys approach this?
> >> >>
> >> >> Thanks,
> >> >> Ali
> >> >>
> >>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message