flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7784) Don't fail TwoPhaseCommitSinkFunction when failing to commit
Date Mon, 30 Oct 2017 11:39:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224753#comment-16224753
] 

ASF GitHub Bot commented on FLINK-7784:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4910#discussion_r147678943
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
---
    @@ -442,13 +445,31 @@ public FlinkKafkaProducer011(
     			throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must
be supplied in the producer config properties.");
     		}
     
    -		if (!producerConfig.contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
    +		if (!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
     			long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
     			checkState(timeout < Integer.MAX_VALUE && timeout > 0, "timeout does
not fit into 32 bit integer");
     			this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout);
     			LOG.warn("Property [%s] not specified. Setting it to %s", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
     		}
     
    +		// Enable transactionTimeoutWarnings to avoid silent data loss
    +		// See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
    +		// The KafkaProducer may not throw an exception if the transaction failed to commit
    +		if (semantic == Semantic.EXACTLY_ONCE) {
    +			final Object object = this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
    --- End diff --
    
    Couldn't find any.


> Don't fail TwoPhaseCommitSinkFunction when failing to commit
> ------------------------------------------------------------
>
>                 Key: FLINK-7784
>                 URL: https://issues.apache.org/jira/browse/FLINK-7784
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Aljoscha Krettek
>            Assignee: Gary Yao
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> Currently, {{TwoPhaseCommitSinkFunction}} will fail if committing fails (either when
doing it via the completed checkpoint notification or when trying to commit after restoring
after failure). This means that the job will go into an infinite recovery loop because we
will always keep failing.
> In some cases it might be better to ignore those failures and keep on processing and
this should be the default. We can provide an option that allows failing the sink on failing
commits.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message