flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10353) Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written with Semantic.AT_LEAST_ONCE fails with NPE
Date Tue, 06 Nov 2018 09:59:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16676470#comment-16676470
] 

ASF GitHub Bot commented on FLINK-10353:
----------------------------------------

pnowojski commented on a change in pull request #7010: [FLINK-10353][kafka] Support change
of transactional semantics in Kaf…
URL: https://github.com/apache/flink/pull/7010#discussion_r231059615
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 ##########
 @@ -566,6 +568,76 @@ public void testRunOutOfProducersInThePool() throws Exception {
 		deleteTestTopic(topic);
 	}
 
+	@Test
+	public void testMigrateFromAtLeastOnceToExactlyOnce() throws Exception {
+		String topic = "testMigrateFromAtLeastOnceToExactlyOnce";
+
+		OperatorSubtaskState producerSnapshot;
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic,
AT_LEAST_ONCE)) {
+			testHarness.setup();
+			testHarness.open();
+			testHarness.processElement(42, 0);
+			testHarness.snapshot(0, 1);
+			testHarness.processElement(43, 2);
+			testHarness.notifyOfCompletedCheckpoint(0);
+			producerSnapshot = testHarness.snapshot(1, 3);
+			testHarness.processElement(44, 4);
+		}
+
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic,
EXACTLY_ONCE)) {
+			testHarness.setup();
+			// restore from snapshot, all records until here should be persisted
+			testHarness.initializeState(producerSnapshot);
+			testHarness.open();
+
+			// write and commit more records
+			testHarness.processElement(44, 7);
+			testHarness.snapshot(2, 8);
+			testHarness.processElement(45, 9);
+		}
+
+		//now we should have:
+		// - records 42, 43, 44 in directly flushed writes from at-least-once
+		// - aborted transactions with records 44 and 45
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 44), 30_000L);
+		deleteTestTopic(topic);
+	}
+
+	@Test
+	public void testMigrateFromAtExactlyOnceToAtLeastOnce() throws Exception {
+		String topic = "testMigrateFromExactlyOnceToAtLeastOnce";
+
+		OperatorSubtaskState producerSnapshot;
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic,
EXACTLY_ONCE)) {
+			testHarness.setup();
+			testHarness.open();
+			testHarness.processElement(42, 0);
+			testHarness.snapshot(0, 1);
+			testHarness.processElement(43, 2);
+			testHarness.notifyOfCompletedCheckpoint(0);
+			producerSnapshot = testHarness.snapshot(1, 3);
+			testHarness.processElement(44, 4);
+		}
+
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic,
AT_LEAST_ONCE)) {
+			testHarness.setup();
+			// restore from snapshot
+			testHarness.initializeState(producerSnapshot);
+			testHarness.open();
+
+			// write and commit more records, after potentially lingering transactions
+			testHarness.processElement(44, 7);
+			testHarness.snapshot(2, 8);
+			testHarness.processElement(45, 9);
+		}
+
+		//now we should have:
+		// - records 42 and 43 in committed transactions
+		// - aborted transactions with records 44 and 45
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 44, 45),
30_000L);
 
 Review comment:
   Could you provide overloaded version of `assertExactlyOnceForTopic` with default value
for `long timeoutMillis = 30_000L` as separate commit? I know that it was like that before,
but I have only now realised how duplicated (mostly by me) the magic constant `30_000L` is
everywhere...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written with Semantic.AT_LEAST_ONCE
fails with NPE
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-10353
>                 URL: https://issues.apache.org/jira/browse/FLINK-10353
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.5.3, 1.6.0
>            Reporter: Konstantin Knauf
>            Assignee: Stefan Richter
>            Priority: Critical
>              Labels: pull-request-available
>
> If a KafkaProducer with {{Semantic.EXACTLY_ONCE}} is restored from a savepoint written
with {{Semantic.AT_LEAST_ONCE}} the job fails on restore with the NPE below. This makes it
impossible to upgrade an AT_LEAST_ONCE pipeline to an EXACTL_ONCE pipeline statefully.
> {quote}
> java.lang.NullPointerException
> 	at java.util.Hashtable.put(Hashtable.java:460)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:955)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:733)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93)
> 	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:373)
> 	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:333)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:867)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:748){quote}
> The reason is, that for {{Semantic.AT_LEAST_ONCE}} the snapshotted state of the {{TwoPhaseCommitFunction}}
is of the form "TransactionHolder\{handle=KafkaTransactionState [transactionalId=null, producerId=-1,
epoch=-1], transactionStartTime=1537175471175}".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message