flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Decouple Kafka partitions and Flink parallelism for ordered streams
Date Wed, 11 Oct 2017 15:36:37 GMT
I couldn't find a proper solution for this. The easiest solution might 
be to use the Async I/O 

and do the validation
with an ExecutionService or similar in the map function.

I've CC'd aljoscha, maybe he has another idea.

The local partitioning solution is, theoretically, not impossible to do, 
but it will not work with all sources and interact oddly with 
checkpoints/savepoints when changing parallelism.

Given a source parallelism S, and a map parallelism M, the idea is to 
create S sub-plans,
each consisting of a distinct source and M map functions, and ensuring 
that each runs
together (the latter part flink should already take care of).

something like:

for i in S:
	source = createSeparateSource().setParallelism(1)
	partitioned = source.partitionCustom(...)
	partitions = []
	for j in M:

This probably doesn't work with Kafka, since distinct kafka sources 
cannot cooperate in distributing partitions AFAIK.
It also simply obliterates the concept of parallelism, which will make 
modifications to the parallelism quite a pain when
checkpointing is enabled.

I've written a sample job that uses side-outputs to do the partitioning 
(since this was the first thing that came to mind),
attached below. Note that I essentially only wrote it to see what would 
actually happen.

public static void main(String[] args) throws Exception { final 
StreamExecutionEnvironment env = 
List<DataStream<String>> sources = new ArrayList<>(); for (int x = 0; x

< 6; x++) { sources.add(env.addSource(new SourceFunction<String>() { 
@Override public void run(SourceContext<String> ctx) throws Exception { 
for (String w : WORDS) { ctx.collect(w); } while(true) { 
Thread.sleep(5000); } } @Override public void cancel() { } })); } int 
numMaps = 4; for (int sourceIndex = 0; sourceIndex < sources.size(); 
sourceIndex++) { DataStream<String> source = sources.get(sourceIndex); 
List<OutputTag<String>> tags = new ArrayList<>(4); for (int x = 0; x <

numMaps; x++) { tags.add(new OutputTag<String>(sourceIndex + "-" + x) { 
}); } SingleOutputStreamOperator<String> partitioned = 
source.process(new ProcessFunction<String, String>() { @Override public 
void processElement(String value, Context ctx, Collector<String> out) 
throws Exception { ctx.output(tags.get(value.hashCode() % tags.size()), 
value); } }); List<DataStream<String>> toUnion = new 
ArrayList<>(tags.size()); for (OutputTag<String> tag : tags) { 
toUnion.add(partitioned.getSideOutput(tag) .map(new MapFunction<String, 
String>() { @Override public String map(String value) throws Exception { 
return tag.toString() + " - " + value; } }).disableChaining()); } 
DataStream<String> unionBase = toUnion.remove(0); unionBase = 
unionBase.union(toUnion.toArray(new DataStream[0])); unionBase.print(); 
} // execute program env.execute("Theory");

On 11.10.2017 16:31, Chesnay Schepler wrote:
> It is correct that keyBy and partition operations will distribute 
> messages over the network
> as they distribute the data across all subtasks. For this use-case we 
> only want to consider
> subtasks that are subsequent to our operator, like a local keyBy.
> I don't think there is an obvious way to implement it, but I'm 
> currently theory-crafting a bit
> and will get back to you.
> On 11.10.2017 14:52, Sanne de Roever wrote:
>> Hi,
>> Currently we need 75 Kafka partitions per topic and a parallelism of 
>> 75 to meet required performance, increasing the partitions and 
>> parallelism gives diminished returns
>> Currently the performance is approx. 1500 msg/s per core, having one 
>> pipeline (source, map, sink) deployed as one instance per core.
>> The Kafka source performance is not an issue. The map is very heavy 
>> (deserialization, validation) on rather complex Avro messages. Object 
>> reuse is enabled.
>> Ideally we would like to decouple Flink processing parallelism from 
>> Kafka partitions in a following manner:
>>   * Pick a source parallelism
>>   * Per source, be able to pick a parallelism for the following map
>>   * In such a way that some message key determines which -local- map
>>     instance gets a message from a certain visitor
>>   * So that messages with the same visitor key get processed by the
>>     same map and in order for that visitor
>>   * Output the result to Kafka
>> AFAIK keyBy, partitionCustom will distribute messages over the 
>> network and rescale has no affinity for message identity.
>> Am I missing something obvious?
>> Cheers,
>> Sanne

View raw message