flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ron Crocker <rcroc...@newrelic.com>
Subject Making external calls from a FlinkKafkaPartitioner
Date Fri, 03 Nov 2017 03:06:02 GMT
We have a system where the Kafka partition a message should go into is a function of a value
in the message. Often, it’s value % # partitions, but for some values it’s not - it’s
a specified list of partitions that changes over time. Our “simple Java library” that
produces messages for this system also has a background thread that periodically polls a HTTP
endpoint (at a rate of 1/minute as its default) to refresh that list of special cases.

It’s easy to create a FlinkKafkaPartitioner that does the mod operation; what I’m not
so sure about is how to get this polling operation into the partitioner. I’m about to try
it the obvious way (create a background thread that polls the URL and updates the partition
map), but I wonder if that’s actually going to cause a bunch of problems for the Flink runtime.

Here’s the code that I have right now:
public class EventInsertPartitioner extends KafkaPartitioner<Tuple2<Long, String>>
    private final String partitionerURL;
    private final long updateIntervalInMillis;
    private Map<Long, List<Integer>> partitionMap;
    private ScheduledExecutorService executor;

    public EventInsertPartitioner(String partitionerURL, long updateIntervalInMillis) {
        this.partitionerURL = partitionerURL;
        this.updateIntervalInMillis = updateIntervalInMillis;
        this.partitionMap = new HashMap<>();

    public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
        executor = Executors.newScheduledThreadPool(1);
                () -> updatePartitionMapRunnable(),


    private void updatePartitionMapRunnable() {
        // Make synchronous request to partitionerURL
        // This is a simple JSON that matches our data
        String response = "{1:[1,2,3],2:[2]}";
        // Replace current partitionMap with new HashMap from the response
        this.partitionMap = convertResponseToMap(response); 
        // Replacing the current value of partitionMap with the updated version doesn't
        // require synchronization

    private Map<Long, List<Integer>> convertResponseToMap(String response) {
        Map<Long, List<Integer>> hashMap = new HashMap<>();
        // Convert response to JSON structure and just use that?
        // or Iterate and add to local hashMap
        return hashMap;

    public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue,
int numPartitions) {
        long myKey = next.f0;
        if (partitionMap.containsKey(myKey)) {
            List<Integer> partitions = partitionMap.get(myKey);
            myKey = partitions.get(ThreadLocalRandom.current().nextInt(partitions.size()));
        return (int)(myKey % numPartitions);
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: +1 630 363 8835

View raw message