flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] tweise commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
Date Tue, 20 Jul 2021 03:39:54 GMT

tweise commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r672782651



##########
File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -110,69 +106,64 @@ public Boundedness getBoundedness() {
 
     @Override
     public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() {
-        // List<SimpleVersionedSerializer<SourceSplit>> serializers = new ArrayList<>();
-        // TODO: serializers are created on demand as underlying sources are created during
switch
-        // sources.forEach(t -> serializers.add(castSerializer(t.source.getSplitSerializer())));
         return new HybridSourceSplitSerializer(switchedSources);
     }
 
     @Override
     public SimpleVersionedSerializer<HybridSourceEnumeratorState>
             getEnumeratorCheckpointSerializer() {
-        List<SimpleVersionedSerializer<Object>> serializers = new ArrayList<>();
-        sources.forEach(
-                t -> serializers.add(castSerializer(t.source.getEnumeratorCheckpointSerializer())));
-        return new HybridSourceEnumeratorStateSerializer(serializers);
-    }
-
-    private static <T> SimpleVersionedSerializer<T> castSerializer(
-            SimpleVersionedSerializer<? extends T> s) {
-        @SuppressWarnings("rawtypes")
-        SimpleVersionedSerializer s1 = s;
-        return s1;
+        return new HybridSourceEnumeratorStateSerializer(switchedSources);
     }
 
     /**
-     * Callback for switch time customization of the underlying source, typically to dynamically
set
-     * a start position from previous enumerator end state.
+     * Factory for underlying sources of {@link HybridSource}.
      *
-     * <p>Requires the ability to augment the existing source (or clone and modify).
Provides the
-     * flexibility to set start position in any way a source allows, in a source specific
way.
-     * Future convenience could be built on top of it, for example an implementation recognizes
-     * optional interfaces.
+     * <p>This factory permits building of a source at graph construction time or deferred
at switch
+     * time. Provides the ability to set a start position in any way a specific source allows.
+     * Future convenience could be built on top of it, for example a default implementation
that
+     * recognizes optional interfaces to transfer position in a common format.
      *
      * <p>Called when the current enumerator has finished and before the next enumerator
is created.
-     * The enumerator end state can thus be used to set the next source's start start position.
+     * The enumerator end state can thus be used to set the next source's start start position.
Only
+     * required for dynamic position transfer at time of switching.
      *
-     * <p>Only required for dynamic position transfer at time of switching, otherwise
source can be
-     * preconfigured with a start position during job submission.
+     * <p>If start position is known at jib submission, the source can be constructed
in the entry
+     * point and simply wrapped into the factory, providing the benefit of validation during
+     * submission.
      */
-    public interface SourceConfigurer<SourceT extends Source, FromEnumT extends SplitEnumerator>
+    public interface SourceFactory<T, SourceT extends Source, FromEnumT extends SplitEnumerator>
             extends Serializable {
-        SourceT configure(SourceT source, FromEnumT enumerator);
+        SourceT create(FromEnumT enumerator);

Review comment:
       @becketqin thank you for the detailed analysis. I agree that there should be a way
to extract the end position from `FileSource`. It's mentioned on the FLIP that we would like
to keep this work separate, so that we can also think about a potential uniform end position
format. Meanwhile `SourceFactory` and `SourceSwitchContext` provide the mechanism to solve
this in a source specific way. We intend to use it internally with a custom source and other
sources may provide their own way to access the end position. I hope that the augmented examples
make this more clear and the intended boundary for this PR also makes sense? CC: @AHeise




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message