flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9560) RateLimiting for FileSystem
Date Fri, 15 Jun 2018 09:54:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513621#comment-16513621
] 

ASF GitHub Bot commented on FLINK-9560:
---------------------------------------

Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6149#discussion_r195685352
  
    --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsFileSystemTest.java
---
    @@ -122,6 +122,42 @@ public void testLimitingInputStreams() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testLimitingRateLimitingStream() throws Exception {
    +		final LimitedConnectionsFileSystem limitedFs = new LimitedConnectionsFileSystem(
    +				LocalFileSystem.getSharedInstance(),
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				Integer.MAX_VALUE,
    +				0,
    +				0,
    +				10000, // Limit write to 10 kbytes/s
    +				10000); // Limit read to 10 kbytes/s
    +		File file = tempFolder.newFile();
    +		Path path = new Path(file.toURI());
    +		long durationWrite = System.currentTimeMillis();
    +		try (FSDataOutputStream stream = limitedFs.create(path, WriteMode.OVERWRITE)) {
    +			final Random rnd = new Random();
    +			final byte[] data = new byte[100];
    +			for (int i = 0; i < (1000 + 10); i++) {
    +				rnd.nextBytes(data);
    +				stream.write(data);
    +			}
    +		}
    +		durationWrite = System.currentTimeMillis() - durationWrite;
    +
    +		long durationRead = System.currentTimeMillis();
    +		final byte[] data = new byte[100];
    +		try (FSDataInputStream stream = limitedFs.open(path)) {
    +			//noinspection StatementWithEmptyBody
    +			while (stream.read(data) != -1) {}
    +		}
    +		durationRead = System.currentTimeMillis() - durationRead;
    +		file.delete();
    +		assertTrue(durationWrite > 10000);
    +		assertTrue(durationRead > 8000); // Less stability with read limiter than write
    --- End diff --
    
    I think burst credits are accumulated over time. If you create RateLimiter in `LimitedConnectionsFileSystem`
constructor and immediately start using the output rate limiter, it will not have any burst
credits accumulated. However input rate limiter will be used only after couple of seconds
(when write has finished), thus it will accumulate burst credits.
    
    Btw, that makes me thing, do we really want to have two separate limiters for read and
writes? Are the quotas on S3 accounted also separately? Or usually is there a single quota
for reads and writes together? If the latter one, it would be more handy to have one shared
`RateLimiter` between reads and writes.


> RateLimiting for FileSystem
> ---------------------------
>
>                 Key: FLINK-9560
>                 URL: https://issues.apache.org/jira/browse/FLINK-9560
>             Project: Flink
>          Issue Type: Improvement
>          Components: FileSystem
>    Affects Versions: 1.5.0
>            Reporter: Etienne CARRIERE
>            Priority: Major
>
> *Pain*: On our system, we see that during checkpoint , all the bandwidth is take to send
the checkpoint to object storage (s3 in our case)
> *Proposal* : After the creation of some limitation on Filesystem (mostly number of connections
with the  tickets FLINK-8125/FLINK-8198/FLINK-9468), I propose to add ratelimiting "per Filesystem"
.
> *Proposal of implementation* : Modify LimitedConnectionsFileSystem to add a ratelimiter
on both Input and OutputStream.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message