flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl0u <...@git.apache.org>
Subject [GitHub] flink pull request #4919: [FLINK-7902] Use TypeSerializer in TwoPhaseCommitS...
Date Mon, 30 Oct 2017 12:39:03 GMT
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4919#discussion_r147674429
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
---
    @@ -993,14 +1000,162 @@ public String toString() {
     	 * Context associated to this instance of the {@link FlinkKafkaProducer011}. User for
keeping track of the
     	 * transactionalIds.
     	 */
    -	public static class KafkaTransactionContext {
    -		public final Set<String> transactionalIds;
    +	static class KafkaTransactionContext {
    +		final Set<String> transactionalIds;
     
    -		public KafkaTransactionContext(Set<String> transactionalIds) {
    +		KafkaTransactionContext(Set<String> transactionalIds) {
     			this.transactionalIds = transactionalIds;
     		}
     	}
     
    +	static class TransactionStateSerializer extends TypeSerializerSingleton<KafkaTransactionState>
{
    +		@Override
    +		public boolean isImmutableType() {
    +			return true;
    +		}
    +
    +		@Override
    +		public KafkaTransactionState createInstance() {
    +			return null;
    +		}
    +
    +		@Override
    +		public KafkaTransactionState copy(KafkaTransactionState from) {
    +			return from;
    +		}
    +
    +		@Override
    +		public KafkaTransactionState copy(
    +			KafkaTransactionState from,
    +			KafkaTransactionState reuse) {
    +			return from;
    +		}
    +
    +		@Override
    +		public int getLength() {
    +			return -1;
    +		}
    +
    +		@Override
    +		public void serialize(
    +				KafkaTransactionState record,
    +				DataOutputView target) throws IOException {
    +			if (record.transactionalId == null) {
    +				target.writeBoolean(false);
    +			} else {
    +				target.writeBoolean(true);
    +				target.writeUTF(record.transactionalId);
    +			}
    +			target.writeLong(record.producerId);
    +			target.writeShort(record.epoch);
    +		}
    +
    +		@Override
    +		public KafkaTransactionState deserialize(DataInputView source) throws IOException {
    +			String transactionalId = null;
    +			if (source.readBoolean()) {
    +				transactionalId = source.readUTF();
    +			}
    +			long producerId = source.readLong();
    +			short epoch = source.readShort();
    +			return new KafkaTransactionState(transactionalId, producerId, epoch, null);
    +		}
    +
    +		@Override
    +		public KafkaTransactionState deserialize(
    +				KafkaTransactionState reuse,
    +				DataInputView source) throws IOException {
    +			return deserialize(source);
    +		}
    +
    +		@Override
    +		public void copy(
    +				DataInputView source, DataOutputView target) throws IOException {
    +			boolean hasTransactionalId = source.readBoolean();
    --- End diff --
    
    For uniformity, this can become: `target.writeBoolean(source.readBoolean())`.


---

Mime
View raw message