flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljoscha <...@git.apache.org>
Subject [GitHub] flink pull request #3746: [FLINK-4022] [kafka] Partition and topic discovery...
Date Thu, 04 May 2017 14:38:37 GMT
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3746#discussion_r114760511
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
---
    @@ -424,65 +485,136 @@ public void run(SourceContext<T> sourceContext) throws Exception
{
     			throw new Exception("The partitions were not set for the consumer");
     		}
     
    -		// we need only do work, if we actually have partitions assigned
    -		if (!subscribedPartitionsToStartOffsets.isEmpty()) {
    -
    -			// create the fetcher that will communicate with the Kafka brokers
    -			final AbstractFetcher<T, ?> fetcher = createFetcher(
    -					sourceContext,
    -					subscribedPartitionsToStartOffsets,
    -					periodicWatermarkAssigner,
    -					punctuatedWatermarkAssigner,
    -					(StreamingRuntimeContext) getRuntimeContext(),
    -					offsetCommitMode);
    -
    -			// publish the reference, for snapshot-, commit-, and cancel calls
    -			// IMPORTANT: We can only do that now, because only now will calls to
    -			//            the fetchers 'snapshotCurrentState()' method return at least
    -			//            the restored offsets
    -			this.kafkaFetcher = fetcher;
    -			if (!running) {
    -				return;
    -			}
    -			
    -			// (3) run the fetcher' main work method
    -			fetcher.runFetchLoop();
    +		this.runThread = Thread.currentThread();
    +
    +		// mark the subtask as temporarily idle if there are no initial seed partitions;
    +		// once this subtask discovers some partitions and starts collecting records, the subtask's
    +		// status will automatically be triggered back to be active.
    +		if (subscribedPartitionsToStartOffsets.isEmpty()) {
    +			sourceContext.markAsTemporarilyIdle();
     		}
    -		else {
    -			// this source never completes, so emit a Long.MAX_VALUE watermark
    -			// to not block watermark forwarding
    -			sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
     
    -			// wait until this is canceled
    -			final Object waitLock = new Object();
    +		// create the fetcher that will communicate with the Kafka brokers
    +		final AbstractFetcher<T, ?> fetcher = createFetcher(
    +				sourceContext,
    +				subscribedPartitionsToStartOffsets,
    +				periodicWatermarkAssigner,
    +				punctuatedWatermarkAssigner,
    +				(StreamingRuntimeContext) getRuntimeContext(),
    +				offsetCommitMode);
    +
    +		// publish the reference, for snapshot-, commit-, and cancel calls
    +		// IMPORTANT: We can only do that now, because only now will calls to
    +		//            the fetchers 'snapshotCurrentState()' method return at least
    +		//            the restored offsets
    +		this.kafkaFetcher = fetcher;
    +
    +		if (!running) {
    +			return;
    +		}
    +
    +		// depending on whether we were restored with the current state version (1.3),
    +		// remaining logic branches off into 2 paths:
    +		//  1) New state - main fetcher loop executed as separate thread, with this
    +		//                 thread running the partition discovery loop
    +		//  2) Old state - partition discovery is disabled, simply going into the main fetcher
loop
    +
    +		if (!restoredFromOldState) {
    +			final AtomicReference<Exception> fetcherErrorRef = new AtomicReference<>();
    +			Thread fetcherThread = new Thread(new Runnable() {
    +				@Override
    +				public void run() {
    +					try {
    +						// run the fetcher' main work method
    +						kafkaFetcher.runFetchLoop();
    --- End diff --
    
    Before this, the Fetcher was run in the Task thread. I'm not sure that's strictly necessary
anymore but in the past there were always problems if a Thread that is not the main Thread
of a Task was emitting stuff.
    
    Is there a good reason for not starting the partition discoverer in a separate thread?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message