flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kien Truong <duckientru...@gmail.com>
Subject Re: How to get past "bad" Kafka message, restart, keep state
Date Wed, 20 Jun 2018 13:59:36 GMT
Hi,

You can use FlatMap instead of Map, and only collect valid elements.


Regards,

Kien


On 6/20/2018 7:57 AM, chrisr123 wrote:
> First time I'm trying to get this to work so bear with me. I'm trying to
> learn checkpointing with Kafka and handling "bad" messages, restarting
> without losing state.
>
> Use Case:
> Use checkpointing.
> Read a stream of integers from Kafka, keep a running sum.
> If a "bad" Kafka message read, restart app, skip the "bad" message, keep
> state.
> My stream would something look like this:
>
> set1,5
> set1,7
> set1,foobar
> set1,6
>
> I want my app to keep a running sum of the integers it has seen, and restart
> if it crashes without losing state. so my running sum would be:
> 5,
> 12,
> app crashes and restarts
> 18
>
> However, I'm finding when my app restarts, it keeps reading the bad "foobar"
> message and doesnt get past it. Source code below. The mapper bombs when I
> try to parse "foobar" as an Integer.
> How can I modify app to get past "poison" message?
>
> env.enableCheckpointing(1000L);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
> env.getCheckpointConfig().setCheckpointTimeout(10000);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.setStateBackend(new
> FsStateBackend("hdfs://mymachine:9000/flink/checkpoints"));
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", BROKERS);
> properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
> properties.setProperty("group.id", "consumerGroup1");
>
> FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(topicName,
> new SimpleStringSchema(), properties);
> DataStream<String> messageStream = env.addSource(kafkaConsumer);
>
> DataStream<Tuple2&lt;String,Integer>> sums = messageStream
>    .map(new NumberMapper())
>    .keyBy(0)
>    .sum(1);		
>    sums.print();
>
>
> 	private static class NumberMapper implements
> MapFunction<String,Tuple2&lt;String,Integer>> {
> 		public Tuple2<String,Integer> map(String input) throws Exception {
> 			return parseData(input);
> 		}
> 		
> 		private Tuple2<String,Integer> parseData(String record) {
> 			
> 			String[] tokens = record.toLowerCase().split(",");
> 			
> 			// Get Key
> 			String key = tokens[0];
> 			
> 			// Get Integer Value
> 			String integerValue = tokens[1];
> 			System.out.println("Trying to Parse=" + integerValue);
> 			Integer value = Integer.parseInt(integerValue);
> 			
> 			// Build TupleBoundedOutOfOrdernessGenerator
> 			return new Tuple2<String,Integer>(key, value);
> 		}
> 		
> 	}
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Mime
View raw message