flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnowojski <...@git.apache.org>
Subject [GitHub] flink pull request #4910: [FLINK-7784] [kafka-producer] Don't fail TwoPhaseC...
Date Mon, 30 Oct 2017 11:03:02 GMT
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4910#discussion_r147656635
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
---
    @@ -58,18 +61,37 @@
     		extends RichSinkFunction<IN>
     		implements CheckpointedFunction, CheckpointListener {
     
    -	private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
    +	private final Logger log;
     
    -	protected final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor;
    +	private final Clock clock;
     
    -	protected final LinkedHashMap<Long, TXN> pendingCommitTransactions = new LinkedHashMap<>();
    +	protected final LinkedHashMap<Long, TransactionHolder<TXN>> pendingCommitTransactions
= new LinkedHashMap<>();
     
    -	@Nullable
    -	protected TXN currentTransaction;
     	protected Optional<CONTEXT> userContext;
     
     	protected ListState<State<TXN, CONTEXT>> state;
     
    +	private final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor;
    +
    +	private TransactionHolder<TXN> currentTransaction;
    +
    +	/**
    +	 * Specifies the maximum time a transaction should remain open.
    +	 */
    +	private long transactionTimeout = Long.MAX_VALUE;
    +
    +	/**
    +	 * If true, any exception thrown in {@link #recoverAndCommit(Object)} will be caught
instead of
    +	 * propagated.
    +	 */
    +	private boolean failureOnCommitAfterTransactionTimeoutDisabled;
    --- End diff --
    
    rename to `propagateTransactionTimeouts` or `ignoreTransactionTimeouts`


---

Mime
View raw message