flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GJL <...@git.apache.org>
Subject [GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...
Date Mon, 30 Oct 2017 11:48:13 GMT
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4910#discussion_r147680902
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
---
    @@ -293,26 +330,110 @@ public void initializeState(FunctionInitializationContext context)
throws Except
     		}
     		// if in restore we didn't get any userContext or we are initializing from scratch
     		if (userContext == null) {
    -			LOG.info("{} - no state to restore", name());
    +			log.info("{} - no state to restore", name());
     
     			userContext = initializeUserContext();
     		}
     		this.pendingCommitTransactions.clear();
     
    -		currentTransaction = beginTransaction();
    -		LOG.debug("{} - started new transaction '{}'", name(), currentTransaction);
    +		currentTransaction = beginTransaction0();
    +		log.debug("{} - started new transaction '{}'", name(), currentTransaction);
    +	}
    +
    +	/**
    +	 * This method must be the only place to call {@link #beginTransaction()} to ensure
that the
    +	 * {@link TransactionHolder} is created at the same time.
    +	 */
    +	private TransactionHolder<TXN> beginTransaction0() throws Exception {
    --- End diff --
    
    Will rename.


---

Mime
View raw message