flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7902) TwoPhaseCommitSinkFunctions should use custom TypeSerializer
Date Mon, 30 Oct 2017 12:49:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224863#comment-16224863
] 

ASF GitHub Bot commented on FLINK-7902:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4919#discussion_r147692117
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
---
    @@ -362,4 +374,213 @@ public void setContext(Optional<CONTEXT> context) {
     			this.context = context;
     		}
     	}
    +
    +	/**
    +	 * Custom {@link TypeSerializer} for the sink state.
    +	 */
    +	static final class StateSerializer<TXN, CONTEXT> extends TypeSerializer<State<TXN,
CONTEXT>> {
    +
    +		private final TypeSerializer<TXN> transactionSerializer;
    +		private final TypeSerializer<CONTEXT> contextSerializer;
    +
    +		public StateSerializer(
    +				TypeSerializer<TXN> transactionSerializer,
    +				TypeSerializer<CONTEXT> contextSerializer) {
    +			this.transactionSerializer = checkNotNull(transactionSerializer);
    +			this.contextSerializer = checkNotNull(contextSerializer);
    +		}
    +
    +		@Override
    +		public boolean isImmutableType() {
    +			return transactionSerializer.isImmutableType() && contextSerializer.isImmutableType();
    +		}
    +
    +		@Override
    +		public TypeSerializer<State<TXN, CONTEXT>> duplicate() {
    +			return new StateSerializer<>(
    +					transactionSerializer.duplicate(), contextSerializer.duplicate());
    +		}
    +
    +		@Override
    +		public State<TXN, CONTEXT> createInstance() {
    +			return null;
    +		}
    +
    +		@Override
    +		public State<TXN, CONTEXT> copy(State<TXN, CONTEXT> from) {
    +			TXN copiedPendingTransaction = transactionSerializer.copy(from.getPendingTransaction());
    +			List<TXN> copiedPendingCommitTransactions = new ArrayList<>();
    +			for (TXN txn : from.getPendingCommitTransactions()) {
    +				copiedPendingCommitTransactions.add(transactionSerializer.copy(txn));
    +			}
    +			Optional<CONTEXT> copiedContext = from.getContext().map(contextSerializer::copy);
    +			return new State<>(copiedPendingTransaction, copiedPendingCommitTransactions,
copiedContext);
    +		}
    +
    +		@Override
    +		public State<TXN, CONTEXT> copy(
    +				State<TXN, CONTEXT> from,
    +				State<TXN, CONTEXT> reuse) {
    +			return copy(from);
    +		}
    +
    +		@Override
    +		public int getLength() {
    +			return -1;
    +		}
    +
    +		@Override
    +		public void serialize(
    +				State<TXN, CONTEXT> record,
    +				DataOutputView target) throws IOException {
    +			transactionSerializer.serialize(record.getPendingTransaction(), target);
    +			List<TXN> pendingCommitTransactions = record.getPendingCommitTransactions();
    +			target.writeInt(pendingCommitTransactions.size());
    +			for (TXN pendingTxn : pendingCommitTransactions) {
    +				transactionSerializer.serialize(pendingTxn, target);
    +			}
    +			Optional<CONTEXT> context = record.getContext();
    +			if (context.isPresent()) {
    +				target.writeBoolean(true);
    +				contextSerializer.serialize(context.get(), target);
    +			} else {
    +				target.writeBoolean(false);
    +			}
    +		}
    +
    +		@Override
    +		public State<TXN, CONTEXT> deserialize(DataInputView source) throws IOException
{
    +			TXN pendingTxn = transactionSerializer.deserialize(source);
    +			int numPendingCommitTxns = source.readInt();
    +			List<TXN> pendingCommitTxns = new ArrayList<>(numPendingCommitTxns);
    +			for (int i = 0; i < numPendingCommitTxns; i++) {
    +				pendingCommitTxns.add(transactionSerializer.deserialize(source));
    +			}
    +			Optional<CONTEXT> context = Optional.empty();
    +			boolean hasContext = source.readBoolean();
    +			if (hasContext) {
    +				context = Optional.of(contextSerializer.deserialize(source));
    +			}
    +			return new State<>(pendingTxn, pendingCommitTxns, context);
    +		}
    +
    +		@Override
    +		public State<TXN, CONTEXT> deserialize(
    +				State<TXN, CONTEXT> reuse,
    +				DataInputView source) throws IOException {
    +			return deserialize(source);
    +		}
    +
    +		@Override
    +		public void copy(
    +				DataInputView source, DataOutputView target) throws IOException {
    +			TXN pendingTxn = transactionSerializer.deserialize(source);
    +			transactionSerializer.serialize(pendingTxn, target);
    +			int numPendingCommitTxns = source.readInt();
    +			target.writeInt(numPendingCommitTxns);
    +			for (int i = 0; i < numPendingCommitTxns; i++) {
    +				TXN pendingCommitTxn = transactionSerializer.deserialize(source);
    +				transactionSerializer.serialize(pendingCommitTxn, target);
    +			}
    +			boolean hasContext = source.readBoolean();
    --- End diff --
    
    fixing


> TwoPhaseCommitSinkFunctions should use custom TypeSerializer
> ------------------------------------------------------------
>
>                 Key: FLINK-7902
>                 URL: https://issues.apache.org/jira/browse/FLINK-7902
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new TypeHint<State<KafkaTransactionState,
KafkaTransactionContext>>() {})}} to create a {{TypeInformation}} which in turn is used
to create a {{StateDescriptor}} for the state that the Kafka sink stores.
> Behind the scenes, this would be roughly analysed as a {{PojoType(GenericType<KafkaTransactionState>,
GenericType<KafkaTransactionContext>)}} which means we don't have explicit control over
the serialisation format and we also use Kryo (which is the default for {{GenericTypeInfo}}).
This can be problematic if we want to evolve the state schema in the future or if we want
to change Kryo versions.
> We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor:
> {code}
> public TwoPhaseCommitSinkFunction(TypeSerializer<State<TXN, CONTEXT>> stateSerializer)
{
> {code}
> and we should then change the {{FlinkKafkaProducer011}} to hand in a custom-made {{TypeSerializer}}
for the state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message