flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnowojski <...@git.apache.org>
Subject [GitHub] flink pull request #4206: [FLINK-6996] FlinkKafkaProducer010 doesn't guarant...
Date Mon, 03 Jul 2017 14:52:18 GMT
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r125306972
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
---
    @@ -172,6 +194,118 @@ public void cancel() {
     		}
     	}
     
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
    +		testOneToOneAtLeastOnce(true);
    +	}
    +
    +	/**
    +	 * Tests the at-least-once semantic for the simple writes into Kafka.
    +	 */
    +	@Test
    +	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
    +		testOneToOneAtLeastOnce(false);
    +	}
    +
    +	/**
    +	 * This test sets KafkaProducer so that it will not automatically flush the data and
    +	 * and fails the broker to check whether FlinkKafkaProducer flushed records manually
on snapshotState.
    +	 */
    +	protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
    +		final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
    +		final int partition = 0;
    +		final int numElements = 1000;
    +		final int failAfterElements = 333;
    +
    +		createTestTopic(topic, 1, 1);
    +
    +		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO,
new ExecutionConfig());
    +		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.enableCheckpointing(500);
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.getConfig().disableSysoutLogging();
    +
    +		Properties properties = new Properties();
    +		properties.putAll(standardProps);
    +		properties.putAll(secureProps);
    +		// decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer
will try send pending (not flushed) data on close()
    +		properties.setProperty("timeout.ms", "10000");
    +		properties.setProperty("max.block.ms", "10000");
    +		// increase batch.size and linger.ms - this tells KafkaProducer to batch produced events
instead of flushing them immediately
    +		properties.setProperty("batch.size", "10240000");
    +		properties.setProperty("linger.ms", "10000");
    +
    +		int leaderId = kafkaServer.getLeaderToShutDown(topic);
    +		BrokerRestartingMapper.resetState();
    +
    +		// process exactly failAfterElements number of elements and then shutdown Kafka broker
and fail application
    +		DataStream<Integer> inputStream = env
    +			.fromCollection(getIntegersSequence(numElements))
    +			.map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
    +
    +		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema,
properties, new FlinkKafkaPartitioner<Integer>() {
    +			@Override
    +			public int partition(Integer record, byte[] key, byte[] value, String targetTopic,
int[] partitions) {
    +				return partition;
    +			}
    +		});
    +
    +		if (regularSink) {
    +			inputStream.addSink(kafkaSink.getUserFunction());
    +		}
    +		else {
    +			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties,
new FlinkKafkaPartitioner<Integer>() {
    +				@Override
    +				public int partition(Integer record, byte[] key, byte[] value, String targetTopic,
int[] partitions) {
    +					return partition;
    +				}
    +			});
    +		}
    +
    +		FailingIdentityMapper.failedBefore = false;
    --- End diff --
    
    This is static variable and `FialingIdentityMapper` is used twice. First to test regular
sink and then custom sink operator. Without reseting this state second test run would fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message