flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kashmar, Ali" <Ali.Kash...@emc.com>
Subject Re: How to use static data with streams?
Date Thu, 05 Nov 2015 20:56:16 GMT
Hi Robert,

I tried the approach you suggested and it works nicely. Thanks!

I have a few more questions if you don’t mind:

1. Is there a way to retrieve in one stream data that's stored in another
stream? I have a location stream that I can use to store the latest
subscriber location. I have another stream that needs access to the latest
subscriber location processed by the location stream. I read a bit on
broadcast variables but they’re only available for DataSets, not
DataStreams. Did I miss a way in Flink to do this?

2. We are planning to test this on a Flink cluster of 3 nodes (1 master
and 2 slaves).
    
   a. If I use a socket stream, does each node listen for data on its
socket or is it only the job manager node? I assume it’s the latter. This
is 		 important because I have to figure out how to make the system highly
available.
   b. Is there a way to split the afore-mentioned CSV file across the
three nodes in the cluster?

Sorry for bombarding you with questions.

Thanks,
Ali


On 2015-11-05, 10:47 AM, "Robert Metzger" <rmetzger@apache.org> wrote:

>Hi Ali,
>
>great, the start-local-streaming.sh script sounds right.
>
>I can explain why your first approach didn't work:
>
>You were trying to send the CSV files from the Flink client to the cluster
>using our RPC system (Akka). When you submit a job to Flink, we serialize
>all the objects the user created (mappers, sources, ...) and send it to
>the
>cluster.
>There is a method StreamExecutionEnvironment.fromElements(..) which allows
>users to serialize a few objects along with the job submission. But the
>amount of data you can transfer like this is limited by the Akka frame
>size. In our case I think the default is 10 megabytes.
>After that, Akka will probably just drop or reject the deployment message.
>
>I'm pretty sure the approach I've suggested will resolve the issue.
>
>Please let me know if you need further assistance.
>
>Regards,
>Robert
>
>
>
>On Thu, Nov 5, 2015 at 3:39 PM, Kashmar, Ali <Ali.Kashmar@emc.com> wrote:
>
>> I did not load the CSV file using the approach you suggested. I was
>> loading it outside the operators (at the beginning of the main method of
>> my class), since the file will be needed by multiple operators for sure.
>> When the file was small, I saw the job registered and started, but when
>>I
>> used a big CSV file, the job never got registered with the task manager
>>(I
>> tried the ‘list' command and got nothing).
>>
>> Here’s what I saw with the small(ish) file:
>>
>> # flink run analytics-flink.jar 19001 minisubs.csv output.csv
>> loaded 200000 subscribers from csv file
>> 11/02/2015 16:36:59 Job execution switched to status RUNNING.
>> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
>> Sink(1/1) switched to SCHEDULED
>> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
>> Sink(1/1) switched to DEPLOYING
>> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
>> Sink(1/1) switched to RUNNING
>>
>>
>> And here’s what I saw with the big file:
>>
>> # flink run analytics-flink.jar 19001 subs.csv output.csv
>> loaded 1173547 subscribers from csv file
>>
>>
>> I’m already using the streaming mode. I’m running a single Flink node
>> right now on Centos 7 using the ‘start-local-streaming.sh’ script.
>>
>> Thanks,
>> Ali
>>
>> On 2015-11-05, 10:22 AM, "Robert Metzger" <rmetzger@apache.org> wrote:
>>
>> >Okay.
>> >
>> >you should be able to implement it as you described initially. I would
>>do
>> >the transformation in a map() operator of Flink. The RichMapFunction
>> >provides you with an open() method which is called before the first
>>record
>> >arrives.
>> >In the open() method, I would read the csv file(s) from HDFS or another
>> >file system accessible by all nodes.
>> >
>> >Then, you can access the data from the files in the map operator.
>> >
>> >In order to utilize the memory best, I would recommend to start Flink
>>in
>> >the "streaming" mode. (-st argument on YARN). With that enabled, we
>> >provide
>> >more memory to streaming operators.
>> >Also, I would only expose one processing slot per TaskManager, this
>>way we
>> >ensure that the files are only read once per TaskManager. (make sure
>>you
>> >have only one TaskManager per machine).
>> >
>> >Why did your previous approach fail? Do you still have the error
>>message?
>> >
>> >Regards,
>> >Robert
>> >
>> >On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <Ali.Kashmar@emc.com>
>>wrote:
>> >
>> >> Hi Robert,
>> >>
>> >> The CSV file (or files as there will definitely be more than one)
>>can be
>> >> large (let¹s say 1 GB). Memory is not an issue though. Each node has
>>at
>> >> least 64 GB RAM mounted. The CSV files should easily fit in the
>>memory
>> >>of
>> >> each node.
>> >>
>> >> Regards,
>> >> Ali
>> >>
>> >>
>> >>
>> >> On 2015-11-05, 6:30 AM, "Robert Metzger" <rmetzger@apache.org> wrote:
>> >>
>> >> >Hi Ali,
>> >> >
>> >> >I'm excited to hear that EMC is looking into Apache Flink. I think
>>the
>> >> >solution to this problem depends on one question: What is the size
>>of
>> >>the
>> >> >data in the CSV file compared to the memory you have available in
>>the
>> >> >cluster?
>> >> >Would the mapping table from the file fit into the memory of all
>>nodes
>> >> >running Flink?
>> >> >
>> >> >Regards,
>> >> >Robert
>> >> >
>> >> >PS: Did you subscribe to the mailing list? I've CCed you in case
>>you're
>> >> >not
>> >> >subscribed yet
>> >> >
>> >> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <Ali.Kashmar@emc.com>
>> >>wrote:
>> >> >
>> >> >> Hi there,
>> >> >>
>> >> >> I¹m trying to design and implement a use case in Flink where I¹m
>> >> >>receiving
>> >> >> protocol packets over a socket. Each packet has the subscriber
>>IMSI
>> >>in
>> >> >>it
>> >> >> and a bunch of more data. At the same time, I have a csv file
>>with a
>> >> >> mapping from IMSI -> subscriber group. I need to inject the
group
>> >>into
>> >> >> packet and then send it to the sink.
>> >> >>
>> >> >> I¹ve tried loading the CSV into a memory map and then accessing
>>the
>> >>map
>> >> >> from within the Flink operators but that only works when the CSV
>>is
>> >>very
>> >> >> small (a few hundred subscribers). I¹ve tried creating another
>>stream
>> >> >>for
>> >> >> the CSV and connecting the streams but that doesn¹t yield anything
>> >>as I
>> >> >> can¹t have access to objects from both streams at the same time.
>> >> >>
>> >> >> How would you guys approach this?
>> >> >>
>> >> >> Thanks,
>> >> >> Ali
>> >> >>
>> >>
>> >>
>>

Mime
View raw message