flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yan Chou Chen <ycchen...@gmail.com>
Subject How MapFunction gets executed?
Date Thu, 16 Jun 2016 15:07:30 GMT
A quick question. When running a stream job that executes
DataStream.map(MapFunction) , after data is read from Kafka, does each
MapFunction is created per item or based on parallelism?

For instance, for the following code snippet

val env = StreamExecutionEnvironment.getExeutionEnvironment
val stream = env.addSource(FlinkKafkaConsumer09(...))
stream.map(new RichMapFunction[String, Unit] {

    // my AsyncHttpClient instance

    override def open(params: Configuration) { /* create my
AsyncHttpClient instance, etc. */ }

    override def close() { /* close my AsyncHttpClient instance*/ }

    override def map(record: String) {
        // my code
    }
})

Is RichMapFunction created for each record (as String in the above
example)? Or say the program set parallelism to 4 so 4 RichMapFunction
instances are created first, then data read from Kafka consumer is
divided into 4 partitions (or something similar), and then map(record:
String) is called within something like while loop? Or what is the
actual flow? Or source code I can start from (I trace through
StreamExecutionEnvironment/ addSource/ DataStream/ transform/
addOperator etc., but I then get lost in source code)?

Basically my problem is I have an AsyncHttpClient instance opened
within open() function and close in close function according to the
RichMapFunction doc. However, an issue is that in some cases my
AsyncHttpClient instance is not executed which displays warning like

AsyncHttpClient.close() hasn't been invoked, which may produce file
descriptor leaks

Therefore I would like to know the life cycle so that I can close
resource appropriately.

Thanks

Mime
View raw message