flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jakub Danilewicz <jdanilew...@alto-analytics.com>
Subject Re: Flink 1.5+ performance in a Java standalone environment
Date Mon, 28 Oct 2019 13:10:34 GMT
Thanks for your replies.

We use Flink from within a standalone Java 8 application (no Hadoop, no clustering), so it's
basically boils down to running a simple code like this:

import java.util.*;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.*;
import org.apache.flink.graph.library.CommunityDetection;

public class FlinkTester {
    final Random random = new Random(1);
    final float density = 3.0F;

    public static void main(String[] args) throws Exception {
        new FlinkTester().execute(1000000, 4);
    }

    private void execute(int numEdges, int parallelism) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
        final Graph<Long, Long, Double> graph = createGraph(numEdges, env);

        final long start = System.currentTimeMillis();
        List<Vertex<Long, Long>> vertices = graph.run(new CommunityDetection<Long>(10,
0.5)).getVertices().collect();
        System.out.println(vertices.size() + " vertices processed in " + (System.currentTimeMillis()-start)/1000
+ " s");
    }

    private Graph<Long, Long, Double> createGraph(int numEdges, ExecutionEnvironment
env) {
        System.out.println("Creating new graph of " + numEdges + " edges...");

        final int maxNumVertices = (int)(numEdges/density);
        final Map<Long, Vertex<Long, Long>> vertexMap = new HashMap<>(maxNumVertices);
        final Map<String, Edge<Long, Double>> edgeMap = new HashMap<>(numEdges);

        while (edgeMap.size() < numEdges) {
            long sourceId = random.nextInt(maxNumVertices) + 1;
            long targetId = sourceId;
            while (targetId == sourceId)
                targetId = random.nextInt(maxNumVertices) + 1;

            final String edgeKey = sourceId + "#" + targetId;
            if (!edgeMap.containsKey(edgeKey)) {
                edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, 1D));
                if (!vertexMap.containsKey(sourceId))
                    vertexMap.put(sourceId, new Vertex<>(sourceId, sourceId));
                if (!vertexMap.containsKey(targetId))
                    vertexMap.put(targetId, new Vertex<>(targetId, targetId));
            }
        }

        System.out.println(edgeMap.size() + " edges created between " + vertexMap.size() +
" vertices.");
        return Graph.fromCollection(vertexMap.values(), edgeMap.values(), env);
    }
}

No matter what graph algorithm you pick for benchmarking (above it's CommunityDetection) the
bigger the graph the wider performance gap (and higher CPU/memory consumption) you observe
when comparing the execution times between the old engine (<= Flink 1.4.2) and the new
one (checked on 1.5.6, 1.8.2 and 1.9.1).

Just run the code yourselves (you may play with the number of edges and parallel threads).

Best,

Jakub


Mime
View raw message