flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrick Fial <patrick.f...@id1.de>
Subject Manual error/recovery handling in flink applications
Date Thu, 31 Jan 2019 07:07:52 GMT
I am working on an application based on Apache Flink, which makes use of Apache Kafka for input
and out.
I have the requirement that all incoming messages received via kafka must be processed in-order,
as well safely be stored in a persistence layer (database), and no message must get lost.

The streaming-part in this application is rather trivial/small, as the main logic will boil
down to something like:

environment.addSource(consumer)  // 1) DataStream[Option[Elem]]
  .filter(_.isDefined)                            // 2) discard unparsable messages
  .map(_.get)                                     // 3) unwrap Option
  .map(InputEvent.fromXml(_))         // 4) convert from XML to internal representation
  .keyBy(_.id)                                    // 5) assure in-order processing on logical-key
  .map(new DBFunction)                  // 6) database lookup, store of update and additional
  .map(InputEvent.toXml(_))            // 7) convert back to XML
  .addSink(producer)                        // 8) attach kafka producer sink

Now, during this pipeline, several error situations could occur:

- the database becomes unavailable (shutdown, tablespace full, ...)
- changes cannot be stored because of logical errors (from column format)
- the kafka producer cannot send a message because of broker inavailability

and probably other situations.

Now my question is, how can I assure consistency as per the above in those situations, when
I in fact would have to do something like:

a) Stream-Operator 6) detects a problem (DB unavailable)
b) The DB-connection of the DBFunction object must be recovered, which might only succeed
after some minutes
c) This means that overall processing must be suspended, at best for the whole pipeline, so
that incoming messages are lot loaded into memory
d) Resume processing after database has been recovered. Processing must resume exactly with
the message which encountered the problem at 1)
Now I know that there is at least 2 tools regarding failure handling:

kafka consumer offsets
apache flink checkpoints
However, searching the docs, I fail to see how either of those could be used in the middle
of stream processing from within a single operator.

So, what would be the recommended strategies for fine-grained error handling and recovery
in a streaming application?


Patrick Fial

Client Platform Entwickler

Information Design One AG

Phone +49 69 244 502 38

Web www.id1.de <http://www.id1.de/>

Information Design One AG, Baseler Stra├če 10, 60329 Frankfurt am Main

Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596

Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht (Vorsitz)
View raw message