flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: Questions re: ExecutionGraph & ResultPartitions for interactive use a la Spark
Date Thu, 14 Jan 2016 10:52:50 GMT
Hey Kovas

sorry for the long delay.

> On 10 Jan 2016, at 06:20, kovas boguta <kovas.boguta@gmail.com> wrote:
> 1) How can I prevent ResultPartitions from being released?
> In interactive use, RPs should not necessarily be released when there are no pending
tasks to consume them. 

Max Michels did some work along those lines in a by now very out dated pull request: https://github.com/apache/flink/pull/640

> Looking at the code, its hard to understand the high-level logic of who triggers their
release, how the refcounting works, etc. For instance, is releasePartitionsProducedBy called
by the producer, or the consumer, or both? Is the refcount initialized at the beginning of
the task setup, and decremented every time its read out? 

I fully agree that the respective part of the system is very much under documented. Sorry
about that.

- ResultPartitionManager: the result partition manager keeps track of all partitions of a
task manager. Each task registers the produced partitions when it is instantiated (see Task
constructor and NetworkEnvironment#registerTask). This is the final truth about which partitions
are available etc.

- releasePartitionsProducedBy: This is not part of the regular life cycle of the results,
but only triggered by the job manager to get rid of old results in case of cancellation/failure.

- Ref counts: The release of the results during normal operation happens via the ref counts.
Currently, the ref counts are always initialised to the number of sub partitions (A result
partition consists of 1 or more sub partitions for each parallel receiver of the result).
Decrementing happens when a sub partition has been fully consumed (via ResultPartition#onConsumedSubpartition).
And as soon as the ref count reaches 0, they are released. The final release happens in the

The behaviour that would need to change is the last step in the result partition manager imo.
I think #640 has some modifications in that respect, which might be helpful in figuring out
the details. I think what can happen instead of the final release is that a result becomes
“cached” and stay around.

> Ideally, I could force certain ResultPartitions to only be manually releasable, so I
can consume them over and over.  

How would you like to have this controlled by the user? Offer a API operation like `.pin()`?
Do you need it pinned permanently until you release it or would it be ok to just cache it
and maybe recompute if another task needs more memory/disk space?

> 2) Can I call attachJobGraph on the ExecutionGraph after the job has begun executing
to add more nodes?
> I read that Flink does not support changing the running the topology. But what about
extending the topology to add more nodes?

I think that should be possible. The problem at the moment is that this will recompute everything
from the sources. Your suggestion makes sense and actually this was one of the motivations
for #640. The newly attached nodes would back track to the produced results of the initial
topology and go on from there.

> If the IntermediateResultPartition are just sitting around from previously completely
tasks, this seems straightforward in principle.. would one have to fiddle with the scheduler/event
system to kick things off again?

I think it will be possible to just submit the new parts. But I’m not sure about the details.

> 3) Can I have a ConnectionManager without a TaskManager?
> For interactive use, I want to selectively pull data from ResultPartitions into my local
REPL process. But I don't want my local process to be a TaskManager node, and have tasks assigned
to it. 
> So this question boils down to, how to hook a ConnectionManager into the Flink communication/actor

In theory it should be possible, yes. In practice I see problems with setting up all the parameters.

You have to instantiate a NettyConnectionManager. For the start method, only the NetworkBufferPool
is relevant, which is easy to instantiate as well. This will take part of the network transfers.

To get the data you need a SingleInputGate and set it up with RemoteChannel instances for
each consumed subpartition. This is where you need to know all the partition IDs and task
managers (The connection ID is a wrapper around InetSocketAddress with a connection index
for connection sharing).

When you have the gate setup, you can query it with the RecordReader. The input gate itself
just returns byte buffers.

Do you have an idea about how you want to figure out the partition and connection IDs in your
repl process? If yes, I could provide some more concrete code snippets on the setup.


If would like to contribute to Flink, we can also think about splitting this up into some
smaller tasks and address them. Your ideas are definitely in line with what we wanted to have
in Flink anyways. The recent focus on the streaming part of the system has pulled most contributors
away from the batch parts though. I would suggest to also look at the changes in #640. Although
the PR is rather old, the network stack has not seen many changes in recent times.

Feel free to post further questions! :)

– Ufuk

View raw message