From commits-return-15260-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Jan 9 16:13:57 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id D581018076D for ; Tue, 9 Jan 2018 16:13:56 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C5E99160C13; Tue, 9 Jan 2018 15:13:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DB236160C3F for ; Tue, 9 Jan 2018 16:13:54 +0100 (CET) Received: (qmail 57040 invoked by uid 500); 9 Jan 2018 15:13:54 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 57013 invoked by uid 99); 9 Jan 2018 15:13:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jan 2018 15:13:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9807AE01EC; Tue, 9 Jan 2018 15:13:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: srichter@apache.org To: commits@flink.apache.org Date: Tue, 09 Jan 2018 15:13:53 -0000 Message-Id: In-Reply-To: <69bb87af462349bba4234b71c7886ff2@git.apache.org> References: <69bb87af462349bba4234b71c7886ff2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/6] flink git commit: [FLINK-8252][benchmarks] convert network benchmarks to streaming benchmarks [FLINK-8252][benchmarks] convert network benchmarks to streaming benchmarks This allows us to use the output flushing interval as a parameter to evaluate, too. This closes #5259. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/544c9703 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/544c9703 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/544c9703 Branch: refs/heads/master Commit: 544c9703d97668b8d4a952501756db52156ff2ef Parents: 6cfb758 Author: Nico Kruber Authored: Thu Dec 14 17:30:19 2017 +0100 Committer: Stefan Richter Committed: Tue Jan 9 16:10:51 2018 +0100 ---------------------------------------------------------------------- .../benchmark/LongRecordWriterThread.java | 94 ------- .../benchmark/NetworkBenchmarkEnvironment.java | 278 ------------------- .../benchmark/NetworkThroughputBenchmark.java | 90 ------ .../NetworkThroughputBenchmarkTests.java | 74 ----- .../io/network/benchmark/ReceiverThread.java | 98 ------- .../benchmark/SerializingLongReceiver.java | 57 ---- .../io/benchmark/LongRecordWriterThread.java | 94 +++++++ .../runtime/io/benchmark/ReceiverThread.java | 98 +++++++ .../io/benchmark/SerializingLongReceiver.java | 57 ++++ .../StreamNetworkBenchmarkEnvironment.java | 257 ++++++++++++++++- .../StreamNetworkPointToPointBenchmark.java | 3 +- .../StreamNetworkThroughputBenchmark.java | 90 ++++++ .../StreamNetworkThroughputBenchmarkTests.java | 74 +++++ 13 files changed, 662 insertions(+), 702 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java deleted file mode 100644 index 6018e55..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.benchmark; - -import org.apache.flink.core.testutils.CheckedThread; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.types.LongValue; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; - -/** - * Wrapping thread around {@link RecordWriter} that sends a fixed number of LongValue(0) - * records. - */ -public class LongRecordWriterThread extends CheckedThread { - private final RecordWriter recordWriter; - - /** - * Future to wait on a definition of the number of records to send. - */ - private CompletableFuture recordsToSend = new CompletableFuture<>(); - - private volatile boolean running = true; - - public LongRecordWriterThread(RecordWriter recordWriter) { - this.recordWriter = checkNotNull(recordWriter); - } - - public void shutdown() { - running = false; - recordsToSend.complete(0L); - } - - /** - * Initializes the record writer thread with this many numbers to send. - * - *

If the thread was already started, if may now continue. - * - * @param records - * number of records to send - */ - public synchronized void setRecordsToSend(long records) { - checkState(!recordsToSend.isDone()); - recordsToSend.complete(records); - } - - private synchronized CompletableFuture getRecordsToSend() { - return recordsToSend; - } - - private synchronized void finishSendingRecords() { - recordsToSend = new CompletableFuture<>(); - } - - @Override - public void go() throws Exception { - while (running) { - sendRecords(getRecordsToSend().get()); - } - } - - private void sendRecords(long records) throws IOException, InterruptedException { - LongValue value = new LongValue(0); - - for (int i = 1; i < records; i++) { - recordWriter.emit(value); - } - value.setValue(records); - recordWriter.broadcastEmit(value); - recordWriter.flush(); - - finishSendingRecords(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java deleted file mode 100644 index ff06187..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.benchmark; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; -import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; -import org.apache.flink.runtime.deployment.ResultPartitionLocation; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.ConnectionID; -import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartition; -import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.taskmanager.TaskActions; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Arrays; - -import static org.apache.flink.util.ExceptionUtils.suppressExceptions; - -/** - * Context for network benchmarks executed by the external - * flink-benchmarks project. - */ -public class NetworkBenchmarkEnvironment { - - private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); - - private static final int NUM_SLOTS_AND_THREADS = 1; - - private static final InetAddress LOCAL_ADDRESS; - - static { - try { - LOCAL_ADDRESS = InetAddress.getLocalHost(); - } catch (UnknownHostException e) { - throw new Error(e); - } - } - - protected final JobID jobId = new JobID(); - protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID(); - protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(); - - protected NetworkEnvironment senderEnv; - protected NetworkEnvironment receiverEnv; - protected IOManager ioManager; - - protected int channels; - - protected ResultPartitionID[] partitionIds; - - public void setUp(int writers, int channels) throws Exception { - this.channels = channels; - this.partitionIds = new ResultPartitionID[writers]; - - int bufferPoolSize = Math.max(2048, writers * channels * 4); - senderEnv = createNettyNetworkEnvironment(bufferPoolSize); - receiverEnv = createNettyNetworkEnvironment(bufferPoolSize); - ioManager = new IOManagerAsync(); - - senderEnv.start(); - receiverEnv.start(); - - generatePartitionIds(); - } - - public void tearDown() { - suppressExceptions(senderEnv::shutdown); - suppressExceptions(receiverEnv::shutdown); - suppressExceptions(ioManager::shutdown); - } - - public SerializingLongReceiver createReceiver() throws Exception { - TaskManagerLocation senderLocation = new TaskManagerLocation( - ResourceID.generate(), - LOCAL_ADDRESS, - senderEnv.getConnectionManager().getDataPort()); - - InputGate receiverGate = createInputGate( - jobId, - dataSetID, - executionAttemptID, - senderLocation, - receiverEnv, - channels); - - SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length); - - receiver.start(); - return receiver; - } - - public RecordWriter createRecordWriter(int partitionIndex) throws Exception { - ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels); - return new RecordWriter<>(sender); - } - - private void generatePartitionIds() throws Exception { - for (int writer = 0; writer < partitionIds.length; writer++) { - partitionIds[writer] = new ResultPartitionID(); - } - } - - private NetworkEnvironment createNettyNetworkEnvironment( - @SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception { - - final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE); - - final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager( - new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration())); - - return new NetworkEnvironment( - bufferPool, - nettyConnectionManager, - new ResultPartitionManager(), - new TaskEventDispatcher(), - new KvStateRegistry(), - null, - null, - IOMode.SYNC, - TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), - TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), - TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(), - TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue()); - } - - protected ResultPartitionWriter createResultPartition( - JobID jobId, - ResultPartitionID partitionId, - NetworkEnvironment environment, - int channels) throws Exception { - - ResultPartition resultPartition = new ResultPartition( - "sender task", - new NoOpTaskActions(), - jobId, - partitionId, - ResultPartitionType.PIPELINED_BOUNDED, - channels, - 1, - environment.getResultPartitionManager(), - new NoOpResultPartitionConsumableNotifier(), - ioManager, - false); - - // similar to NetworkEnvironment#registerTask() - int numBuffers = resultPartition.getNumberOfSubpartitions() * - TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + - TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); - - BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers); - resultPartition.registerBufferPool(bufferPool); - - environment.getResultPartitionManager().registerResultPartition(resultPartition); - - return resultPartition; - } - - private InputGate createInputGate( - JobID jobId, - IntermediateDataSetID dataSetID, - ExecutionAttemptID executionAttemptID, - final TaskManagerLocation senderLocation, - NetworkEnvironment environment, - final int channels) throws IOException { - - InputGate[] gates = new InputGate[channels]; - for (int channel = 0; channel < channels; ++channel) { - int finalChannel = channel; - InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds) - .map(partitionId -> new InputChannelDeploymentDescriptor( - partitionId, - ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel)))) - .toArray(InputChannelDeploymentDescriptor[]::new); - - final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor( - dataSetID, - ResultPartitionType.PIPELINED_BOUNDED, - channel, - channelDescriptors); - - SingleInputGate gate = SingleInputGate.create( - "receiving task[" + channel + "]", - jobId, - executionAttemptID, - gateDescriptor, - environment, - new NoOpTaskActions(), - UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); - - // similar to NetworkEnvironment#registerTask() - int numBuffers = gate.getNumberOfInputChannels() * - TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + - TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); - - BufferPool bufferPool = - environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers); - - gate.setBufferPool(bufferPool); - gates[channel] = gate; - } - - if (channels > 1) { - return new UnionInputGate(gates); - } else { - return gates[0]; - } - } - - // ------------------------------------------------------------------------ - // Mocks - // ------------------------------------------------------------------------ - - /** - * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito - * to avoid using mockito in this benchmark class. - */ - private static final class NoOpTaskActions implements TaskActions { - - @Override - public void triggerPartitionProducerStateCheck( - JobID jobId, - IntermediateDataSetID intermediateDataSetId, - ResultPartitionID resultPartitionId) {} - - @Override - public void failExternally(Throwable cause) {} - } - - private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier { - - @Override - public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {} - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java deleted file mode 100644 index 799b7c3..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.benchmark; - -import org.apache.flink.types.LongValue; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -/** - * Network throughput benchmarks executed by the external - * flink-benchmarks project. - */ -public class NetworkThroughputBenchmark { - private static final long RECEIVER_TIMEOUT = 30_000; - - private NetworkBenchmarkEnvironment environment; - private ReceiverThread receiver; - private LongRecordWriterThread[] writerThreads; - - /** - * Executes the throughput benchmark with the given number of records. - * - * @param records - * records to pass through the network stack - */ - public void executeBenchmark(long records) throws Exception { - final LongValue value = new LongValue(); - value.setValue(0); - - long lastRecord = records / writerThreads.length; - CompletableFuture recordsReceived = receiver.setExpectedRecord(lastRecord); - - for (LongRecordWriterThread writerThread : writerThreads) { - writerThread.setRecordsToSend(lastRecord); - } - - recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS); - } - - /** - * Initializes the throughput benchmark with the given parameters. - * - * @param recordWriters - * number of senders, i.e. - * {@link org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances - * @param channels - * number of outgoing channels / receivers - */ - public void setUp(int recordWriters, int channels) throws Exception { - environment = new NetworkBenchmarkEnvironment<>(); - environment.setUp(recordWriters, channels); - receiver = environment.createReceiver(); - writerThreads = new LongRecordWriterThread[recordWriters]; - for (int writer = 0; writer < recordWriters; writer++) { - writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer)); - writerThreads[writer].start(); - } - } - - /** - * Shuts down a benchmark previously set up via {@link #setUp}. - * - *

This will wait for all senders to finish but timeout with an exception after 5 seconds. - */ - public void tearDown() throws Exception { - for (LongRecordWriterThread writerThread : writerThreads) { - writerThread.shutdown(); - writerThread.sync(5000); - } - environment.tearDown(); - receiver.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java deleted file mode 100644 index c84743b..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.benchmark; - -import org.junit.Test; - -/** - * Tests for various network benchmarks based on {@link NetworkThroughputBenchmark}. - */ -public class NetworkThroughputBenchmarkTests { - @Test - public void pointToPointBenchmark() throws Exception { - NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); - benchmark.setUp(1, 1); - try { - benchmark.executeBenchmark(1_000); - } - finally { - benchmark.tearDown(); - } - } - - @Test - public void pointToMultiPointBenchmark() throws Exception { - NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); - benchmark.setUp(1, 100); - try { - benchmark.executeBenchmark(1_000); - } - finally { - benchmark.tearDown(); - } - } - - @Test - public void multiPointToPointBenchmark() throws Exception { - NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); - benchmark.setUp(4, 1); - try { - benchmark.executeBenchmark(1_000); - } - finally { - benchmark.tearDown(); - } - } - - @Test - public void multiPointToMultiPointBenchmark() throws Exception { - NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); - benchmark.setUp(4, 100); - try { - benchmark.executeBenchmark(1_000); - } - finally { - benchmark.tearDown(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java deleted file mode 100644 index be1c80f..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.benchmark; - -import org.apache.flink.core.testutils.CheckedThread; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkState; - -/** - * This class waits for {@code expectedRepetitionsOfExpectedRecord} number of occurrences of the - * {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is correlated with number of input channels. - */ -public abstract class ReceiverThread extends CheckedThread { - protected static final Logger LOG = LoggerFactory.getLogger(ReceiverThread.class); - - protected final int expectedRepetitionsOfExpectedRecord; - - protected int expectedRecordCounter; - protected CompletableFuture expectedRecord = new CompletableFuture<>(); - protected CompletableFuture recordsProcessed = new CompletableFuture<>(); - - protected volatile boolean running; - - ReceiverThread(int expectedRepetitionsOfExpectedRecord) { - setName(this.getClass().getName()); - - this.expectedRepetitionsOfExpectedRecord = expectedRepetitionsOfExpectedRecord; - this.running = true; - } - - public synchronized CompletableFuture setExpectedRecord(long record) { - checkState(!expectedRecord.isDone()); - checkState(!recordsProcessed.isDone()); - expectedRecord.complete(record); - expectedRecordCounter = 0; - return recordsProcessed; - } - - private synchronized CompletableFuture getExpectedRecord() { - return expectedRecord; - } - - private synchronized void finishProcessingExpectedRecords() { - checkState(expectedRecord.isDone()); - checkState(!recordsProcessed.isDone()); - - recordsProcessed.complete(null); - expectedRecord = new CompletableFuture<>(); - recordsProcessed = new CompletableFuture<>(); - } - - @Override - public void go() throws Exception { - try { - while (running) { - readRecords(getExpectedRecord().get()); - finishProcessingExpectedRecords(); - } - } - catch (InterruptedException e) { - if (running) { - throw e; - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - protected abstract void readRecords(long lastExpectedRecord) throws Exception; - - public void shutdown() { - running = false; - interrupt(); - expectedRecord.complete(0L); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java deleted file mode 100644 index 848c018..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.benchmark; - -import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.types.LongValue; - -/** - * {@link ReceiverThread} that deserialize incoming messages. - */ -public class SerializingLongReceiver extends ReceiverThread { - - private final MutableRecordReader reader; - - @SuppressWarnings("WeakerAccess") - public SerializingLongReceiver(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) { - super(expectedRepetitionsOfExpectedRecord); - this.reader = new MutableRecordReader<>( - inputGate, - new String[]{ - EnvironmentInformation.getTemporaryFileDirectory() - }); - } - - protected void readRecords(long lastExpectedRecord) throws Exception { - LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord); - final LongValue value = new LongValue(); - - while (running && reader.next(value)) { - final long ts = value.getValue(); - if (ts == lastExpectedRecord) { - expectedRecordCounter++; - if (expectedRecordCounter == expectedRepetitionsOfExpectedRecord) { - break; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java new file mode 100644 index 0000000..e6cc2d5 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.types.LongValue; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Wrapping thread around {@link RecordWriter} that sends a fixed number of LongValue(0) + * records. + */ +public class LongRecordWriterThread extends CheckedThread { + private final RecordWriter recordWriter; + + /** + * Future to wait on a definition of the number of records to send. + */ + private CompletableFuture recordsToSend = new CompletableFuture<>(); + + private volatile boolean running = true; + + public LongRecordWriterThread(RecordWriter recordWriter) { + this.recordWriter = checkNotNull(recordWriter); + } + + public void shutdown() { + running = false; + recordsToSend.complete(0L); + } + + /** + * Initializes the record writer thread with this many numbers to send. + * + *

If the thread was already started, if may now continue. + * + * @param records + * number of records to send + */ + public synchronized void setRecordsToSend(long records) { + checkState(!recordsToSend.isDone()); + recordsToSend.complete(records); + } + + private synchronized CompletableFuture getRecordsToSend() { + return recordsToSend; + } + + private synchronized void finishSendingRecords() { + recordsToSend = new CompletableFuture<>(); + } + + @Override + public void go() throws Exception { + while (running) { + sendRecords(getRecordsToSend().get()); + } + } + + private void sendRecords(long records) throws IOException, InterruptedException { + LongValue value = new LongValue(0); + + for (int i = 1; i < records; i++) { + recordWriter.emit(value); + } + value.setValue(records); + recordWriter.broadcastEmit(value); + recordWriter.flush(); + + finishSendingRecords(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java new file mode 100644 index 0000000..126efef --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/ReceiverThread.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.core.testutils.CheckedThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This class waits for {@code expectedRepetitionsOfExpectedRecord} number of occurrences of the + * {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is correlated with number of input channels. + */ +public abstract class ReceiverThread extends CheckedThread { + protected static final Logger LOG = LoggerFactory.getLogger(ReceiverThread.class); + + protected final int expectedRepetitionsOfExpectedRecord; + + protected int expectedRecordCounter; + protected CompletableFuture expectedRecord = new CompletableFuture<>(); + protected CompletableFuture recordsProcessed = new CompletableFuture<>(); + + protected volatile boolean running; + + ReceiverThread(int expectedRepetitionsOfExpectedRecord) { + setName(this.getClass().getName()); + + this.expectedRepetitionsOfExpectedRecord = expectedRepetitionsOfExpectedRecord; + this.running = true; + } + + public synchronized CompletableFuture setExpectedRecord(long record) { + checkState(!expectedRecord.isDone()); + checkState(!recordsProcessed.isDone()); + expectedRecord.complete(record); + expectedRecordCounter = 0; + return recordsProcessed; + } + + private synchronized CompletableFuture getExpectedRecord() { + return expectedRecord; + } + + private synchronized void finishProcessingExpectedRecords() { + checkState(expectedRecord.isDone()); + checkState(!recordsProcessed.isDone()); + + recordsProcessed.complete(null); + expectedRecord = new CompletableFuture<>(); + recordsProcessed = new CompletableFuture<>(); + } + + @Override + public void go() throws Exception { + try { + while (running) { + readRecords(getExpectedRecord().get()); + finishProcessingExpectedRecords(); + } + } + catch (InterruptedException e) { + if (running) { + throw e; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + protected abstract void readRecords(long lastExpectedRecord) throws Exception; + + public void shutdown() { + running = false; + interrupt(); + expectedRecord.complete(0L); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java new file mode 100644 index 0000000..580612c --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.types.LongValue; + +/** + * {@link ReceiverThread} that deserialize incoming messages. + */ +public class SerializingLongReceiver extends ReceiverThread { + + private final MutableRecordReader reader; + + @SuppressWarnings("WeakerAccess") + public SerializingLongReceiver(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) { + super(expectedRepetitionsOfExpectedRecord); + this.reader = new MutableRecordReader<>( + inputGate, + new String[]{ + EnvironmentInformation.getTemporaryFileDirectory() + }); + } + + protected void readRecords(long lastExpectedRecord) throws Exception { + LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord); + final LongValue value = new LongValue(); + + while (running && reader.next(value)) { + final long ts = value.getValue(); + if (ts == lastExpectedRecord) { + expectedRecordCounter++; + if (expectedRecordCounter == expectedRepetitionsOfExpectedRecord) { + break; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index acbbdf8..83508ea 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -18,23 +18,262 @@ package org.apache.flink.streaming.runtime.io.benchmark; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionLocation; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector; -import org.apache.flink.runtime.io.network.benchmark.NetworkBenchmarkEnvironment; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.taskmanager.TaskActions; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.streaming.runtime.io.StreamRecordWriter; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; + +import static org.apache.flink.util.ExceptionUtils.suppressExceptions; + /** - * Context for stream network benchmarks executed by the external + * Context for network benchmarks executed by the external * flink-benchmarks project. */ -public class StreamNetworkBenchmarkEnvironment extends NetworkBenchmarkEnvironment { +public class StreamNetworkBenchmarkEnvironment { + + private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); + + private static final int NUM_SLOTS_AND_THREADS = 1; + + private static final InetAddress LOCAL_ADDRESS; + + static { + try { + LOCAL_ADDRESS = InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new Error(e); + } + } + + protected final JobID jobId = new JobID(); + protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID(); + protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(); + + protected NetworkEnvironment senderEnv; + protected NetworkEnvironment receiverEnv; + protected IOManager ioManager; + + protected int channels; + + protected ResultPartitionID[] partitionIds; + + public void setUp(int writers, int channels) throws Exception { + this.channels = channels; + this.partitionIds = new ResultPartitionID[writers]; + + int bufferPoolSize = Math.max(2048, writers * channels * 4); + senderEnv = createNettyNetworkEnvironment(bufferPoolSize); + receiverEnv = createNettyNetworkEnvironment(bufferPoolSize); + ioManager = new IOManagerAsync(); + + senderEnv.start(); + receiverEnv.start(); + + generatePartitionIds(); + } + + public void tearDown() { + suppressExceptions(senderEnv::shutdown); + suppressExceptions(receiverEnv::shutdown); + suppressExceptions(ioManager::shutdown); + } + + public SerializingLongReceiver createReceiver() throws Exception { + TaskManagerLocation senderLocation = new TaskManagerLocation( + ResourceID.generate(), + LOCAL_ADDRESS, + senderEnv.getConnectionManager().getDataPort()); + + InputGate receiverGate = createInputGate( + jobId, + dataSetID, + executionAttemptID, + senderLocation, + receiverEnv, + channels); + + SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length); + + receiver.start(); + return receiver; + } + + public StreamRecordWriter createRecordWriter(int partitionIndex, long flushTimeout) throws Exception { + ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels); + return new StreamRecordWriter<>(sender, new RoundRobinChannelSelector(), flushTimeout); + } + + private void generatePartitionIds() throws Exception { + for (int writer = 0; writer < partitionIds.length; writer++) { + partitionIds[writer] = new ResultPartitionID(); + } + } + + private NetworkEnvironment createNettyNetworkEnvironment( + @SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception { + + final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE); + + final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager( + new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration())); + + return new NetworkEnvironment( + bufferPool, + nettyConnectionManager, + new ResultPartitionManager(), + new TaskEventDispatcher(), + new KvStateRegistry(), + null, + null, + IOMode.SYNC, + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(), + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue()); + } + + protected ResultPartitionWriter createResultPartition( + JobID jobId, + ResultPartitionID partitionId, + NetworkEnvironment environment, + int channels) throws Exception { + + ResultPartition resultPartition = new ResultPartition( + "sender task", + new NoOpTaskActions(), + jobId, + partitionId, + ResultPartitionType.PIPELINED_BOUNDED, + channels, + 1, + environment.getResultPartitionManager(), + new NoOpResultPartitionConsumableNotifier(), + ioManager, + false); + + // similar to NetworkEnvironment#registerTask() + int numBuffers = resultPartition.getNumberOfSubpartitions() * + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); + + BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers); + resultPartition.registerBufferPool(bufferPool); + + environment.getResultPartitionManager().registerResultPartition(resultPartition); + + return resultPartition; + } + + private InputGate createInputGate( + JobID jobId, + IntermediateDataSetID dataSetID, + ExecutionAttemptID executionAttemptID, + final TaskManagerLocation senderLocation, + NetworkEnvironment environment, + final int channels) throws IOException { + + InputGate[] gates = new InputGate[channels]; + for (int channel = 0; channel < channels; ++channel) { + int finalChannel = channel; + InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds) + .map(partitionId -> new InputChannelDeploymentDescriptor( + partitionId, + ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel)))) + .toArray(InputChannelDeploymentDescriptor[]::new); + + final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor( + dataSetID, + ResultPartitionType.PIPELINED_BOUNDED, + channel, + channelDescriptors); + + SingleInputGate gate = SingleInputGate.create( + "receiving task[" + channel + "]", + jobId, + executionAttemptID, + gateDescriptor, + environment, + new NoOpTaskActions(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); + + // similar to NetworkEnvironment#registerTask() + int numBuffers = gate.getNumberOfInputChannels() * + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); + + BufferPool bufferPool = + environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers); + + gate.setBufferPool(bufferPool); + gates[channel] = gate; + } + + if (channels > 1) { + return new UnionInputGate(gates); + } else { + return gates[0]; + } + } + + // ------------------------------------------------------------------------ + // Mocks + // ------------------------------------------------------------------------ + + /** + * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito + * to avoid using mockito in this benchmark class. + */ + private static final class NoOpTaskActions implements TaskActions { + + @Override + public void triggerPartitionProducerStateCheck( + JobID jobId, + IntermediateDataSetID intermediateDataSetId, + ResultPartitionID resultPartitionId) {} + + @Override + public void failExternally(Throwable cause) {} + } + + private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier { - public RecordWriter createStreamRecordWriter(int partitionIndex, long flushTimeout) - throws Exception { - ResultPartitionWriter sender = - createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels); - return new StreamRecordWriter<>(sender, new RoundRobinChannelSelector<>(), flushTimeout); + @Override + public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {} } } http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java index 9286485..843d3e2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.io.benchmark; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.io.network.benchmark.ReceiverThread; import org.apache.flink.types.LongValue; import java.util.concurrent.CompletableFuture; @@ -74,7 +73,7 @@ public class StreamNetworkPointToPointBenchmark { environment.setUp(1, 1); receiver = environment.createReceiver(); - recordWriter = environment.createStreamRecordWriter(0, flushTimeout); + recordWriter = environment.createRecordWriter(0, flushTimeout); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java new file mode 100644 index 0000000..3f41b00 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.apache.flink.types.LongValue; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Network throughput benchmarks executed by the external + * flink-benchmarks project. + */ +public class StreamNetworkThroughputBenchmark { + private static final long RECEIVER_TIMEOUT = 30_000; + + private StreamNetworkBenchmarkEnvironment environment; + private ReceiverThread receiver; + private LongRecordWriterThread[] writerThreads; + + /** + * Executes the throughput benchmark with the given number of records. + * + * @param records + * records to pass through the network stack + */ + public void executeBenchmark(long records) throws Exception { + final LongValue value = new LongValue(); + value.setValue(0); + + long lastRecord = records / writerThreads.length; + CompletableFuture recordsReceived = receiver.setExpectedRecord(lastRecord); + + for (LongRecordWriterThread writerThread : writerThreads) { + writerThread.setRecordsToSend(lastRecord); + } + + recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS); + } + + /** + * Initializes the throughput benchmark with the given parameters. + * + * @param recordWriters + * number of senders, i.e. + * {@link org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances + * @param channels + * number of outgoing channels / receivers + */ + public void setUp(int recordWriters, int channels, int flushTimeout) throws Exception { + environment = new StreamNetworkBenchmarkEnvironment<>(); + environment.setUp(recordWriters, channels); + receiver = environment.createReceiver(); + writerThreads = new LongRecordWriterThread[recordWriters]; + for (int writer = 0; writer < recordWriters; writer++) { + writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer, flushTimeout)); + writerThreads[writer].start(); + } + } + + /** + * Shuts down a benchmark previously set up via {@link #setUp}. + * + *

This will wait for all senders to finish but timeout with an exception after 5 seconds. + */ + public void tearDown() throws Exception { + for (LongRecordWriterThread writerThread : writerThreads) { + writerThread.shutdown(); + writerThread.sync(5000); + } + environment.tearDown(); + receiver.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/544c9703/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java new file mode 100644 index 0000000..8af8148 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.benchmark; + +import org.junit.Test; + +/** + * Tests for various network benchmarks based on {@link StreamNetworkThroughputBenchmark}. + */ +public class StreamNetworkThroughputBenchmarkTests { + @Test + public void pointToPointBenchmark() throws Exception { + StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + benchmark.setUp(1, 1, 100); + try { + benchmark.executeBenchmark(1_000); + } + finally { + benchmark.tearDown(); + } + } + + @Test + public void pointToMultiPointBenchmark() throws Exception { + StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + benchmark.setUp(1, 100, 100); + try { + benchmark.executeBenchmark(1_000); + } + finally { + benchmark.tearDown(); + } + } + + @Test + public void multiPointToPointBenchmark() throws Exception { + StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + benchmark.setUp(4, 1, 100); + try { + benchmark.executeBenchmark(1_000); + } + finally { + benchmark.tearDown(); + } + } + + @Test + public void multiPointToMultiPointBenchmark() throws Exception { + StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark(); + benchmark.setUp(4, 100, 100); + try { + benchmark.executeBenchmark(1_000); + } + finally { + benchmark.tearDown(); + } + } +}