flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure
Date Thu, 21 Jun 2018 13:43:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519376#comment-16519376
] 

ASF GitHub Bot commented on FLINK-9374:
---------------------------------------

Github user fmthoma commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197137205
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
---
    @@ -267,6 +268,79 @@ public void go() throws Exception {
     		testHarness.close();
     	}
     
    +	/**
    +	 * Test ensuring that the producer blocks if the queue limit is exceeded,
    +	 * until the queue length drops below the limit;
    +	 * we set a timeout because the test will not finish if the logic is broken.
    +	 */
    +	@Test(timeout = 10000)
    +	public void testBackpressure() throws Throwable {
    +		final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new
SimpleStringSchema());
    +		producer.setQueueLimit(1);
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +				new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
    +
    +		testHarness.open();
    +
    +		UserRecordResult result = mock(UserRecordResult.class);
    +		when(result.isSuccessful()).thenReturn(true);
    +
    +		CheckedThread msg1 = new CheckedThread() {
    +			@Override
    +			public void go() throws Exception {
    +				testHarness.processElement(new StreamRecord<>("msg-1"));
    +			}
    +		};
    +		msg1.start();
    +		msg1.trySync(100);
    +		assertFalse("Flush triggered before reaching queue limit", msg1.isAlive());
    --- End diff --
    
    @tzulitai In principle, yes, if the call `testHarness.processElement(…)` takes more
than 100 milliseconds. However, I believe this is very unlikely even on slow systems, since
the operation is mostly (entirely?) CPU bound. If test failures occur nevertheless, it should
be no problem to increase the timeout for `msg1` and `msg2`.


> Flink Kinesis Producer does not backpressure
> --------------------------------------------
>
>                 Key: FLINK-9374
>                 URL: https://issues.apache.org/jira/browse/FLINK-9374
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>            Reporter: Franz Thoma
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a {{KinesisProducer}}
from the Amazon Kinesis Producer Library (KPL). The KPL internally holds an unbounded queue
of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow indefinitely
if Flink sends records faster than the KPL can forward them to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued records are
dropped after a certain amount of time, but this will lead to data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: {{FlinkKinesisProducer}}
consumes records at arbitrary rate, either until a checkpoint is reached (and will wait until
the queue is flushed), or until out-of-memory, whichever is reached first. (This gets worse
due to the fact that the Java KPL is only a thin wrapper around a C++ process, so it is not
even the Java process that runs out of memory, but the C++ process.) The implicit rate-limit
due to checkpointing leads to a ragged throughput graph like this (the periods with zero throughput
are the wait times before a checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput limited by checkpointing
only
> My proposed solution is to add a config option {{queueLimit}} to set a maximum number
of records that may be waiting in the KPL queue. If this limit is reached, the {{FlinkKinesisProducer}}
should trigger a {{flush()}} and wait (blocking) until the queue length is below the limit
again. This automatically leads to backpressuring, since the {{FlinkKinesisProducer}} cannot
accept records while waiting. For compatibility, {{queueLimit}} is set to {{Integer.MAX_VALUE}}
by default, so the behavior is unchanged unless a client explicitly sets the value. Setting
a »sane« default value is not possible unfortunately, since sensible values for the limit
depend on the record size (the limit should be chosen so that about 10–100MB of records
per shard are accumulated before flushing, otherwise the maximum Kinesis throughput may not
be reached).
> !after.png! Throughput with a queue limit of 100000 records (the spikes are checkpoints,
where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message