flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Katsipoulakis, Nikolaos Romanos" <kat...@cs.pitt.edu>
Subject RE: When is the StreamPartitioner<T> selectChannel() method called
Date Wed, 25 Jan 2017 15:33:12 GMT
Hello Ufuk, 

First, thank you very much for your quick reply and for clarifying the difference between
channels and slots. Turning to debugging and visiting the breakpoint inside the HashPartitioner,
I need to inform you that I am using IntelliJ IDE and I have set up the environment as a maven
project with the following dependencies: 

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>1.1.4</version>
        </dependency>

Therefore, I assume that my local environment is using Flink 1.1. In addition, I set the parallelism
to 8 for the operation after the keyBy() transformation to have 8 sub-tasks perform concurrently.
Unfortunately, a breakpoint inside HashPartioner's selectChannel() is not reached. Am I doing
something wrong?

Kind Regards,

Nikos R. Katsipoulakis, 
Department of Computer Science
University of Pittsburgh

-----Original Message-----
From: Ufuk Celebi [mailto:uce@apache.org] 
Sent: Wednesday, January 25, 2017 10:06 AM
To: user@flink.apache.org
Subject: Re: When is the StreamPartitioner<T> selectChannel() method called

Hey Nikos,

slots are only relevant for scheduling tasks. The number of outgoing channels depends on the
number of parallel subtasks that consume a produced intermediate result stream, say the result
of a source operator. If you have a job with a simple source->keyBy->map flow with parallelism
X you will have X outgoing channels at the source operator, one for each consuming map subtask.
This is what is exposed by the underlying channel selector.

For 1.1 the mentioned HashPartitioner should be called as you describe. In 1.2 this has been
replaced by the KeyGroupStreamPartitioner.

That the HashPartitioner method is not called is probably due to the fact that you are debugging
this remotely. Have you tried it from within your local IDE, too?

– Ufuk


On Wed, Jan 25, 2017 at 2:12 PM, Katsipoulakis, Nikolaos Romanos <katsip@cs.pitt.edu>
wrote:
> Hello all,
>
>
>
> I have been looking into different StreamPartitioner<T> 
> implementations of Flink, and I noticed they come with an 
> implementation of selectChannel(), as defined in the 
> ChannelSelector<T> interface. In order to understand better the 
> actions of a StreamPartitioner during execution, I set up Flink on a 
> single server with one TaskManager that had 16 slots. Then, I 
> submitted a job, with a HashPartitioner (through a keyBy() 
> transformation), and remote debugged it to see when the 
> HashPartitioner’s selectChannel() method is called. Unfortunately, the 
> breakpoint is never reached and the job completes successfully. Is the 
> previous behavior normal? If yes, why is the breakpoint never reached? 
> Does it have to do with running the job in an environment with local 
> slots? Also, what determines the number of channels when a job is executed? Does it have
to do with the number of available slots in the downstream operation of the partitioner?
>
>
>
> Thank you for your time and I appreciate any answers/comments/indications.
>
>
>
> Kind Regards.
>
>
>
> Nikos R. Katsipoulakis
>
> Department of Computer Science,
>
> University of Pittsburgh
>
>
Mime
View raw message