flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject [DISCUSS] FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime
Date Tue, 27 Jun 2017 17:21:22 GMT
Hi all!

I would like to propose the following FLIP:

FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:

The FLIP is motivated by the fact that many users run into an unnecessary
kind of performance problem caused by an old design artifact.

The required change should be reasonably small, and would help many users
and Flink's general standing.

Happy to hear thoughts!



FLIP text is below. Pictures with illustrations are only in the Wiki, not
supported on the mailing list.


The default behavior of the streaming runtime is to copy every element
between chained operators.

That operation was introduced for “safety” reasons, to avoid the number of
cases where users can create incorrect programs by reusing mutable objects
(a discouraged pattern, but possible). For example when using state
backends that keep the state as objects on heap, reusing mutable objects
can theoretically create cases where the same object is used in multiple
state mappings.

The effect is that many people that try Flink get much lower performance
than they could possibly get. From empirical evidence, almost all users
that I (Stephan) have been in touch with eventually run into this issue

There are multiple observations about that design:


   Object copies are extremely costly. While some simple copy virtually for
   free (types reliably detected as immutable are not copied at all), many
   real pipelines use types like Avro, Thrift, JSON, etc, which are very
   expensive to copy.


   Keyed operations currently only occur after shuffles. The operations are
   hence the first in a pipeline and will never have a reused object anyways.
   That means for the most critical operation, this pre-caution is unnecessary.


   The mode is inconsistent with the contract of the DataSet API, which
   does not copy at each step


   To prevent these copies, users can select {{enableObjectReuse()}}, which
   is misleading, since it does not really reuse mutable objects, but avoids
   additional copies.



I propose to change the default behavior of the DataStream runtime to be
the same as the DataSet runtime. That means that new objects are chosen on
every deserialization, and no copies are made as the objects are passed on
along the pipelines.


I propose to drop the execution config flag {{objectReuse}} and instead
introduce an {{ObjectReuseMode}} enumeration with better control of what
should happen. There will be three different types:



      This is the default in the DataSet API

      This will become the default in the DataStream API

      This happens in the DataStream API when {{enableObjectReuse()}} is



      The current default in the DataStream API



      This happens in the DataSet API when {{enableObjectReuse()}} is

An illustration of the modes is as follows:


See here:


See here:


See here:
New or Changed Public Interfaces

Interfaces changed

The interface of the {{ExecutionConfig}} add the method
{{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
{{enableObjectReuse()}} and {{disableObjectReuse()}}.

Behavior changed

The default object passing behavior changes, meaning that it can affect the
correctness of prior DataStream programs that assume the original

Migration Plan and Compatibility


No interface migration path is needed, because the interfaces are not
broken, merely some methods get deprecated.

Behavior change

Variant 1:


   Change the behavior, make it explicit on the release notes that we did
   that and what cases are affected.

   This may actually be feasible, because the cases that are affected are
   quite pathological corner cases that only very bad implementations should
   encounter (see below)

Variant 2:


   When users set the mode, always that mode is used.

   When the mode is not explicitly set, we follow that strategy:

      Change the CLI such that we know when users upgrade existing jobs
      (the savepoint to start from has a version prior to 1.4).

      Use DEFAULT as the default for jobs that do not start from savepoint,
      or that start from savepoint >= 1.4

      Use COPY_PER_OPERATOR as the default for upgraded jobs

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message