flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9897) Further enhance adaptive reads in Kinesis Connector to depend on run loop time
Date Tue, 31 Jul 2018 00:28:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16562699#comment-16562699
] 

ASF GitHub Bot commented on FLINK-9897:
---------------------------------------

glaksh100 commented on a change in pull request #6408: [FLINK-9897][Kinesis Connector] Make
adaptive reads depend on run loop time instead of fetchintervalmillis
URL: https://github.com/apache/flink/pull/6408#discussion_r206363113
 
 

 ##########
 File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ##########
 @@ -233,26 +225,69 @@ public void run() {
 						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
 					long recordBatchSizeBytes = 0L;
-					long averageRecordSizeBytes = 0L;
-
 					for (UserRecord record : fetchedRecords) {
 						recordBatchSizeBytes += record.getData().remaining();
 						deserializeRecordForCollectionAndUpdateState(record);
 					}
 
-					if (useAdaptiveReads && !fetchedRecords.isEmpty()) {
-						averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size();
-						maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-					}
-
 					nextShardItr = getRecordsResult.getNextShardIterator();
+
+					long processingEndTimeNanos = System.nanoTime();
+
+					long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+					long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
+					maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(),
recordBatchSizeBytes);
+					processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
 				}
 			}
 		} catch (Throwable t) {
 			fetcherRef.stopWithError(t);
 		}
 	}
 
+	/**
+	 * Adjusts loop timing to match target frequency if specified.
+	 * @param processingStartTimeNanos The start time of the run loop "work"
+	 * @param processingEndTimeNanos The end time of the run loop "work"
+	 * @return The System.nanoTime() after the sleep (if any)
+	 * @throws InterruptedException
+	 */
+	protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos)
+		throws InterruptedException {
+		long endTimeNanos = processingEndTimeNanos;
+		if (fetchIntervalMillis != 0) {
+			long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos;
+			long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000);
+			if (sleepTimeMillis > 0) {
+				Thread.sleep(sleepTimeMillis);
+				endTimeNanos = System.nanoTime();
+			}
+		}
+		return endTimeNanos;
+	}
+
+	/**
+	 * Calculates how many records to read each time through the loop based on a target throughput
+	 * and the measured frequenecy of the loop.
+	 * @param runLoopTimeNanos The total time of one pass through the loop
+	 * @param numRecords The number of records of the last read operation
+	 * @param recordBatchSizeBytes The total batch size of the last read operation
+	 */
+	protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes)
{
 
 Review comment:
   I modified this to pass the `maxNumberOfRecordsPerFetch` as an argument. Let me know if
that's the change you had in mind.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Further enhance adaptive reads in Kinesis Connector to depend on run loop time
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-9897
>                 URL: https://issues.apache.org/jira/browse/FLINK-9897
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.4.2, 1.5.1
>            Reporter: Lakshmi Rao
>            Assignee: Lakshmi Rao
>            Priority: Major
>              Labels: pull-request-available
>
> In FLINK-9692, we introduced the ability for the shardConsumer to adaptively read more
records based on the current average record size to optimize the 2 Mb/sec shard limit. The feature
maximizes  maxNumberOfRecordsPerFetch of 5 reads/sec (as prescribed by Kinesis limits).
In the case where applications take more time to process records in the run loop, they are
no longer able to read at a frequency of 5 reads/sec (even though their fetchIntervalMillis
maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch should be calculated
based on the time that the run loop actually takes as opposed to fetchIntervalMillis. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message