flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7637) FlinkKinesisProducer violates at-least-once guarantees
Date Mon, 23 Oct 2017 15:21:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16215290#comment-16215290
] 

ASF GitHub Bot commented on FLINK-7637:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4871#discussion_r146301761
  
    --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
---
    @@ -49,61 +71,205 @@ public void testCreateWithNonSerializableDeserializerFails() {
     		exception.expect(IllegalArgumentException.class);
     		exception.expectMessage("The provided serialization schema is not serializable");
     
    -		Properties testConfig = new Properties();
    -		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    -		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
    -		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -		new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), testConfig);
    +		new FlinkKinesisProducer<>(new NonSerializableSerializationSchema(), getStandardProperties());
     	}
     
     	@Test
     	public void testCreateWithSerializableDeserializer() {
    -		Properties testConfig = new Properties();
    -		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    -		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
    -		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -		new FlinkKinesisProducer<>(new SerializableSerializationSchema(), testConfig);
    +		new FlinkKinesisProducer<>(new SerializableSerializationSchema(), getStandardProperties());
     	}
     
     	@Test
     	public void testConfigureWithNonSerializableCustomPartitionerFails() {
     		exception.expect(IllegalArgumentException.class);
     		exception.expectMessage("The provided custom partitioner is not serializable");
     
    -		Properties testConfig = new Properties();
    -		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    -		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
    -		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -		new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
    +		new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties())
     			.setCustomPartitioner(new NonSerializableCustomPartitioner());
     	}
     
     	@Test
     	public void testConfigureWithSerializableCustomPartitioner() {
    -		Properties testConfig = new Properties();
    -		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    -		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
    -		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -		new FlinkKinesisProducer<>(new SimpleStringSchema(), testConfig)
    +		new FlinkKinesisProducer<>(new SimpleStringSchema(), getStandardProperties())
     			.setCustomPartitioner(new SerializableCustomPartitioner());
     	}
     
     	@Test
     	public void testConsumerIsSerializable() {
    -		Properties testConfig = new Properties();
    -		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    -		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
    -		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
    -
    -		FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new
SimpleStringSchema(), testConfig);
    +		FlinkKinesisProducer<String> consumer = new FlinkKinesisProducer<>(new
SimpleStringSchema(), getStandardProperties());
     		assertTrue(InstantiationUtil.isSerializable(consumer));
     	}
     
     	// ----------------------------------------------------------------------
    +	// Tests to verify at-least-once guarantee
    +	// ----------------------------------------------------------------------
    +
    +	/**
    +	 * Test ensuring that if an invoke call happens right after an async exception is caught,
it should be rethrown.
    +	 */
    +	@SuppressWarnings("ResultOfMethodCallIgnored")
    +	@Test
    +	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +		final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new
SimpleStringSchema());
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		producer.getPendingRecordFutures().get(0).setException(new Exception("artificial async
exception"));
    +
    +		try {
    +			testHarness.processElement(new StreamRecord<>("msg-2"));
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    +			Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "artificial async exception").isPresent());
    +
    +			// test succeeded
    +			return;
    +		}
    +
    +		Assert.fail();
    +	}
    +
    +	/**
    +	 * Test ensuring that if a snapshot call happens right after an async exception is caught,
it should be rethrown.
    +	 */
    +	@SuppressWarnings("ResultOfMethodCallIgnored")
    +	@Test
    +	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +		final DummyFlinkKinesisProducer<String> producer = new DummyFlinkKinesisProducer<>(new
SimpleStringSchema());
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
    +
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +		producer.getPendingRecordFutures().get(0).setException(new Exception("artificial async
exception"));
    +
    +		try {
    +			testHarness.snapshot(123L, 123L);
    +		} catch (Exception e) {
    +			// the next invoke should rethrow the async exception
    --- End diff --
    
    nit: the comment refers to `invoke`, which is probably copy-pasted form above


> FlinkKinesisProducer violates at-least-once guarantees
> ------------------------------------------------------
>
>                 Key: FLINK-7637
>                 URL: https://issues.apache.org/jira/browse/FLINK-7637
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> Currently, there is no flushing of KPL outstanding records on checkpoints in the {{FlinkKinesisProducer}}.
Likewise to the at-least-once issue on the Flink Kafka producer before, this may lead to data
loss if there are asynchronous failing records after a checkpoint which the records was part
of was completed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message