flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit
Date Mon, 30 Oct 2017 11:04:03 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224671#comment-16224671
] 

ASF GitHub Bot commented on FLINK-7784:
---------------------------------------

Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4910#discussion_r147661454
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
---
    @@ -293,26 +330,110 @@ public void initializeState(FunctionInitializationContext context)
throws Except
     		}
     		// if in restore we didn't get any userContext or we are initializing from scratch
     		if (userContext == null) {
    -			LOG.info("{} - no state to restore", name());
    +			log.info("{} - no state to restore", name());
     
     			userContext = initializeUserContext();
     		}
     		this.pendingCommitTransactions.clear();
     
    -		currentTransaction = beginTransaction();
    -		LOG.debug("{} - started new transaction '{}'", name(), currentTransaction);
    +		currentTransaction = beginTransaction0();
    +		log.debug("{} - started new transaction '{}'", name(), currentTransaction);
    +	}
    +
    +	/**
    +	 * This method must be the only place to call {@link #beginTransaction()} to ensure
that the
    +	 * {@link TransactionHolder} is created at the same time.
    +	 */
    +	private TransactionHolder<TXN> beginTransaction0() throws Exception {
    +		return new TransactionHolder<>(beginTransaction(), clock.millis());
    +	}
    +
    +	/**
    +	 * This method must be the only place to call {@link #recoverAndCommit(Object)} to ensure
that
    +	 * the configuration parameters {@link #transactionTimeout} and
    +	 * {@link #failureOnCommitAfterTransactionTimeoutDisabled} are respected.
    +	 */
    +	private void recoverAndCommit(TransactionHolder<TXN> transactionHolder) {
    +		try {
    +			logWarningIfTimeoutAlmostReached(transactionHolder);
    +			recoverAndCommit(transactionHolder.handle);
    +		} catch (final Exception e) {
    +			final long elapsedTime = clock.millis() - transactionHolder.transactionStartTime;
    +			if (failureOnCommitAfterTransactionTimeoutDisabled && elapsedTime > transactionTimeout)
{
    +				log.error("Error while committing transaction {}. " +
    --- End diff --
    
    `"Error while committing transaction {}. Data loss might occurred"` 


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> ------------------------------------------------------------
>
>                 Key: FLINK-7784
>                 URL: https://issues.apache.org/jira/browse/FLINK-7784
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Aljoscha Krettek
>            Assignee: Gary Yao
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails (either when
doing it via the completed checkpoint notification or when trying to commit after restoring
after failure). This means that the job will go into an infinite recovery loop because we
will always keep failing.
> In some cases it might be better to ignore those failures and keep on processing and
this should be the default. We can provide an option that allows failing the sink on failing
commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message