flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [22/28] flink git commit: [hotfix] [kinesis] Use ArrayList for faster shard state updates
Date Tue, 06 Feb 2018 19:03:22 GMT
[hotfix] [kinesis] Use ArrayList for faster shard state updates

Previously, the Kinesis Consumer uses LinkedLists as the underlying
implementation for subscribedShardsState. This list is accessed on every
record, updating a shard's state using a shard state index (i.e., the
order of the shard state in the list). With LinkedLists, this access has
linear time complexity, and since this operation happens per record, can
have a big performance hit on executions with a huge number of Kinesis
shards.

This commit changes the list implementation to be an ArrayList for
constant-time shard state access. The downside is that when new shards
are discovered, there will be a costly array re-size. However, since
resharding is not expected to happen so often, this is acceptable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9276ee51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9276ee51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9276ee51

Branch: refs/heads/release-1.4
Commit: 9276ee51aa3d671095ddc8e1335e58f03700db39
Parents: 7884a4f
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Thu Feb 1 13:05:56 2018 +0100
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Tue Feb 6 17:31:32 2018 +0100

----------------------------------------------------------------------
 .../connectors/kinesis/internals/KinesisDataFetcher.java      | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9276ee51/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 79e4bfd..4634421 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -178,8 +179,8 @@ public class KinesisDataFetcher<T> {
 			runtimeContext,
 			configProps,
 			deserializationSchema,
-			new AtomicReference<Throwable>(),
-			new LinkedList<KinesisStreamShardState>(),
+			new AtomicReference<>(),
+			new ArrayList<>(),
 			createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
 			KinesisProxy.create(configProps));
 	}
@@ -192,7 +193,7 @@ public class KinesisDataFetcher<T> {
 								Properties configProps,
 								KinesisDeserializationSchema<T> deserializationSchema,
 								AtomicReference<Throwable> error,
-								LinkedList<KinesisStreamShardState> subscribedShardsState,
+								List<KinesisStreamShardState> subscribedShardsState,
 								HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
 								KinesisProxyInterface kinesis) {
 		this.streams = checkNotNull(streams);


Mime
View raw message