flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jakub Danilewicz <jdanilew...@alto-analytics.com>
Subject Re: Flink 1.5+ performance in a Java standalone environment
Date Fri, 01 Nov 2019 12:07:38 GMT
Thanks for your reply, Till.

As mentioned above I execute graph processing in a straight-ahead Java standalone environment
(no cluster underneath, no specific configuration except for parallelism), just as if you
simply ran the Java class I pasted upthread with a Flink distribution JAR (plus Gelly and
Slf4j/Log4j JARs) on its classpath. 

I do not know what goes on behind the scenes, but the "legacy" mode significantly outperforms
the "new" one in every single case. The new mode is a few times slower, getting worse and
worse with the increasing size of the graph.

As for setting "the maximum parallelism (== number of key groups) to a multiple of your parallelism",
could you tell me which configuration option from the list below is it?

https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html 

Best,

Jakub


On 2019/11/01 10:19:47, Till Rohrmann <trohrmann@apache.org> wrote: 
> Hi Jakub,
> 
> what are the cluster settings and the exact job settings you are running
> your job with? I'm asking because one difference between legacy and FLIP-6
> mode is that the legacy mode spreads out tasks across all available
> TaskManagers whereas the FLIP-6 mode tries to bin package them on as few
> TaskManagers as possible. If you have more slots than the parallelism of
> your job, then I could see how this could affect the performance of your
> job if it is not I/O bound but CPU bound. We will add an option to enable
> the old spread out strategy again [1].
> 
> Another reason why you might see a performance degradation is the placement
> of key groups. In the legacy mode, Flink distributed them so that two
> TaskManagers with the same number of tasks would only have at most one key
> group more. In FLIP-6 it can be up to the number of slots more key groups
> on one of the TaskManagers. In order to mitigate this problem I would
> recommend to set the maximum parallelism (== number of key groups) to a
> multiple of your parallelism.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-12122
> 
> Cheers,
> Till
> 
> On Wed, Oct 30, 2019 at 4:28 PM Jakub Danilewicz <
> jdanilewicz@alto-analytics.com> wrote:
> 
> > Hi,
> >
> > I can confirm that the performance drop is directly related to FLIP-6
> > changes. Applying this modification to the code posted above restores the
> > previous graph processing speed under Flink 1.5.6:
> >
> > ---------------------------------------------------------------------------
> >
> >     org.apache.flink.configuration.Configuration customConfig = new
> > org.apache.flink.configuration.Configuration();
> >     customConfig.setString("mode", "legacy");
> >     final ExecutionEnvironment env =
> > ExecutionEnvironment.createLocalEnvironment(customConfig);
> >     env.setParallelism(parallelism);
> >
> > ---------------------------------------------------------------------------
> >
> > Disabling the "taskmanager.network.credit-model" parameter in
> > Configuration provides only a very slight improvement in the performance
> > under Flink 1.5.6.
> >
> > Now the big question: what about newer versions where the legacy mode is
> > not supported anymore? I checked Flink 1.8.2 and it does not work.
> >
> > Is there any way to make the new mode as performant as the "legacy" one in
> > the standalone scenarios? Alternatively may we expect improvements in this
> > area in the upcoming releases?
> >
> > Best,
> >
> > Jakub
> >
> > On 2019/10/30 14:11:19, Piotr Nowojski <piotr@ververica.com> wrote:
> > > Hi,
> > >
> > > In Flink 1.5 there were three big changes, that could affect
> > performance.
> > > 1. FLIP-6 changes (As previously Yang and Fabian mentioned)
> > > 2. Credit base flow control (especially if you are using SSL)
> > > 3. Low latency network changes
> > >
> > > I would suspect them in that order. First and second you can disable via
> > configuration switches [1] and [2] respectively.
> > >
> > > [1] “mode:legacy"
> > https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
> > <
> > https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
> > >
> > > [2] "taskmanager.network.credit-model:false”
> > >
> > > Could you try disabling them out?
> > >
> > > Piotrek
> > >
> > > > On 28 Oct 2019, at 14:10, Jakub Danilewicz <
> > jdanilewicz@alto-analytics.com> wrote:
> > > >
> > > > Thanks for your replies.
> > > >
> > > > We use Flink from within a standalone Java 8 application (no Hadoop,
> > no clustering), so it's basically boils down to running a simple code like
> > this:
> > > >
> > > > import java.util.*;
> > > > import org.apache.flink.api.java.ExecutionEnvironment;
> > > > import org.apache.flink.graph.*;
> > > > import org.apache.flink.graph.library.CommunityDetection;
> > > >
> > > > public class FlinkTester {
> > > >    final Random random = new Random(1);
> > > >    final float density = 3.0F;
> > > >
> > > >    public static void main(String[] args) throws Exception {
> > > >        new FlinkTester().execute(1000000, 4);
> > > >    }
> > > >
> > > >    private void execute(int numEdges, int parallelism) throws
> > Exception {
> > > >        final ExecutionEnvironment env =
> > ExecutionEnvironment.createLocalEnvironment(parallelism);
> > > >        final Graph<Long, Long, Double> graph = createGraph(numEdges,
> > env);
> > > >
> > > >        final long start = System.currentTimeMillis();
> > > >        List<Vertex<Long, Long>> vertices = graph.run(new
> > CommunityDetection<Long>(10, 0.5)).getVertices().collect();
> > > >        System.out.println(vertices.size() + " vertices processed in "
> > + (System.currentTimeMillis()-start)/1000 + " s");
> > > >    }
> > > >
> > > >    private Graph<Long, Long, Double> createGraph(int numEdges,
> > ExecutionEnvironment env) {
> > > >        System.out.println("Creating new graph of " + numEdges + "
> > edges...");
> > > >
> > > >        final int maxNumVertices = (int)(numEdges/density);
> > > >        final Map<Long, Vertex<Long, Long>> vertexMap = new
> > HashMap<>(maxNumVertices);
> > > >        final Map<String, Edge<Long, Double>> edgeMap = new
> > HashMap<>(numEdges);
> > > >
> > > >        while (edgeMap.size() < numEdges) {
> > > >            long sourceId = random.nextInt(maxNumVertices) + 1;
> > > >            long targetId = sourceId;
> > > >            while (targetId == sourceId)
> > > >                targetId = random.nextInt(maxNumVertices) + 1;
> > > >
> > > >            final String edgeKey = sourceId + "#" + targetId;
> > > >            if (!edgeMap.containsKey(edgeKey)) {
> > > >                edgeMap.put(edgeKey, new Edge<>(sourceId, targetId,
> > 1D));
> > > >                if (!vertexMap.containsKey(sourceId))
> > > >                    vertexMap.put(sourceId, new Vertex<>(sourceId,
> > sourceId));
> > > >                if (!vertexMap.containsKey(targetId))
> > > >                    vertexMap.put(targetId, new Vertex<>(targetId,
> > targetId));
> > > >            }
> > > >        }
> > > >
> > > >        System.out.println(edgeMap.size() + " edges created between " +
> > vertexMap.size() + " vertices.");
> > > >        return Graph.fromCollection(vertexMap.values(),
> > edgeMap.values(), env);
> > > >    }
> > > > }
> > > >
> > > > No matter what graph algorithm you pick for benchmarking (above it's
> > CommunityDetection) the bigger the graph the wider performance gap (and
> > higher CPU/memory consumption) you observe when comparing the execution
> > times between the old engine (<= Flink 1.4.2) and the new one (checked on
> > 1.5.6, 1.8.2 and 1.9.1).
> > > >
> > > > Just run the code yourselves (you may play with the number of edges
> > and parallel threads).
> > > >
> > > > Best,
> > > >
> > > > Jakub
> > > >
> > >
> > >
> >
> 

Mime
View raw message