flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrick Fial <patrick.f...@id1.de>
Subject Maintaining message input order in streams with keyBy/filter/connect
Date Wed, 02 Jan 2019 11:51:29 GMT
I am using apache flink to build a rather complex network of data streams. The idea is, to
implement a rule engine with flink.

As a basic description of the application, this is how it is supposed to work:

Data is received by a kafka consumer source, and processed with a number of data streams,
until it is finally sent to a kafka producer sink. The incoming data contains objects with
a logical key ("object-id"), and the incoming messages may refer to the same object-id. For
every given object-id, the order of its incoming messages must be retained throughout the
application. The order of overall messages can be arbitrary.

This means, messages a,b and c of object1 must be processed in-order, however message x of
object2 might be processed in between a1/b1/c1, before, or after, it does not matter.

For my current understanding this means I must keyBy(_.objectID), so that messages of the
same object are processed in the order they arrived.

*Current approach*
To implement the actual rule engine, a network of streams is created. The idea is the following:

- each rule will have 1-n conditions
- for every condition of every rule create a sub-stream of the original stream with .filter(_.matches(rule.condition))
- combine all sub-streams which correspond to the same rule by using substream1.connect(substream2).flatMap(new
- connect can only join 2 streams, so a rule with 3 conditions will result in subsequent 2
- rules using the same condition will re-use the same sub-stream created in the second step.

This will result in n joined streams, where n corresponds to the number of rules. The joined
streams will have a map function appended to them, which marks the message, so that we know
that a rule matched.

Each joined/result stream may publish its result ("rule xyz matched") to the kafka producer
independently from the other results, so at this point I can attach the sink to the streams.

*Connect details*
Because the .connect of two streams ("condition"-substreams) must only pass a message, if
it was received on both streams (^= both conditions matched), I need a RichCoFlatMapFunction
with a keyed state, which can take care of the "pass only if it was received already on the
other side".

However, the problem is, that the stream is keyed by object-id. So what happens if 2 messages
of the same object run through the network and reach the .connect().map(new RichCoFlatMapFunction...)?
It will lead to wrong ouput. I would need to assign each incoming message a unique ID (UUID)
upon entering the network, so I can use this key (instead of the object-id) in the .connect().map()..
join. But at the same time, I need the stream to be keyed by object-id, so that messages of
the same objects are processed in-order. What to do?

To solve this, I kept the input-stream with keyBy(_.objectID), but the RichCoFlatMapFunction
in the stream-join no longer uses the keyed-state. Instead, I am using a simple operator state,
which keeps a map of passed objects, but implements the same logic, just with manual key/value

This seems to work, however I don't know if this introduces more issues.

The flink GUI will render this image, for a list of 14 rules with a total of 23 conditions
(some rules only have one condition):


The creation of the network is achieved using this code:

val streamCache = mutable.Map[Int,DataStream[WorkingMemory]]()
val outputNodesCache = ListBuffer[DataStream[WorkingMemory]]()

if (rules.isEmpty)

// create partial streams for all conditions (first level)
// cache the sub-stream with the hashcode of its condition as key (for re-use)

for (rule <- rules if rule.checks.nonEmpty ;
     cond <- rule.checks if !streamCache.contains(cond.hashCode()))
  streamCache += cond.hashCode -> sourceStream.filter(cond.matches _)

// create joined streams for combined conditions (sub-levels)

for (rule <- rules if rule.checks.nonEmpty)
  val ruleName = rule.ruleID

  // for each rule, starting with the rule with the least conditions ...

  if (rule.checks.size == 1)
    // ... create exit node if single-condition rule
    // each exit node applies the rule-name to the objects set of matched rules.

    outputNodesCache += streamCache(rule.checks.head.hashCode).map(obj => { obj.matchedRule
= ListBuffer((ruleName, rule.objectType.mkString(":"), rule.statement)) ; obj })
    // ... iterate all conditions, and join nodes into full rule-path (reusing existing intermediate

    var sourceStream:DataStream[WorkingMemory] = streamCache(rule.checks.head.hashCode)
    var idString = rule.checks.head.idString

    for (i <- rule.checks.indices)
      if (i == rule.checks.size-1)
        // reached last condition of rule, create exit-node
        // each exit node applies the rule-name to the objects set of matched rules.

        val rn = ruleName
        val objectType = rule.objectType.mkString(":")
        val statement = rule.statement

        outputNodesCache += sourceStream.map(obj => { obj.matchedRule = ListBuffer((rn,
objectType, statement)) ; obj })
        // intermediate condition, create normal intermediate node

        val there = rule.checks(i+1)
        val connectStream = streamCache(there.hashCode)

        idString += (":" + there.idString)

        // try to re-use existing tree-segments

        if (streamCache.contains(idString.hashCode))
          sourceStream = streamCache(idString.hashCode)
          sourceStream = sourceStream.connect(connectStream).flatMap(new StatefulCombineFunction(idString))

// connect each output-node to the sink

for (stream <- outputNodesCache)
  stream.map(wm => RuleEvent.toXml(wm, wm.matchedRule.headOption)).addSink(sink)

The StatefulCombineFunction used in the previous snippet:

class StatefulCombineFunction(id:String) extends RichCoFlatMapFunction[WorkingMemory, WorkingMemory,
WorkingMemory] with CheckpointedFunction
  private var leftState:ListState[(String, WorkingMemory)] = _
  private var rightState:ListState[(String, WorkingMemory)] = _
  private var bufferedLeft = ListBuffer[(String, WorkingMemory)]()
  private var bufferedRight = ListBuffer[(String, WorkingMemory)]()

  override def flatMap1(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedLeft,
bufferedRight, xmlObject, out, "left")
  override def flatMap2(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedRight,
bufferedLeft, xmlObject, out, "right")

  def combine(leftState: ListBuffer[(String, WorkingMemory)], rightState: ListBuffer[(String,
WorkingMemory)], xmlObject:WorkingMemory, out: Collector[WorkingMemory], side:String): Unit
    val otherIdx:Int = leftState.indexWhere(_._1 == xmlObject.uuid)

    if (otherIdx > -1)
      rightState += ((xmlObject.uuid, xmlObject))

  override def initializeState(context:FunctionInitializationContext): Unit = ???
  override def snapshotState(context:FunctionSnapshotContext):Unit = ???

*Background information*
This application shall implement the rete-algorithm for rule matching using flink (https://en.wikipedia.org/wiki/Rete_algorithm).

A different approach would be to just loop all rules for every incoming message, and attach
the result. I have a working implementation for this approach using flink, so please don't
advise this as a solution.

The problem is, that the application messes up the order of incoming messages on the object-id
level. That is, it does not achieve what I required in the intro. For each object-id, the
incoming messages must keep the order. But this is not the case.

I don't know at which point in code the order gets messed up, or how those operations are
distributed amongst threads, so I don't know how to solve this issue.

Best regards
Patrick Fial
View raw message